Re: Reopen KAFKA-4344 ?
Hi Srinivas I raised the issue and the way I got around this was to let kafka streams run on POJO way rather than some of the dependent instances being spring managed bean instances. If you create the instance of riakService and counterService in processor class instead of passing the spring managed instances to the processor class constructor your kafka streams initilization should be fine and it should create the right number of tasks with right number of processors for all the partitions. I was fine with POJO based approach as kafka streams has quite a bit of apis to query the state(of course once it is started correctly) as i am running stateful processors and i wanted to query the state data all the time. I was just using spring boot controller for the web container to proxy the kafka streams state store(ReadOnlyKeyValueStore) get apis. Alternatively you can try having prototype components for these two services (if your usecase is fine with this). Hope this helps. Regards Sai On Mon, Nov 7, 2016 at 9:08 AM, Matthias J. Sax wrote: > -BEGIN PGP SIGNED MESSAGE- > Hash: SHA512 > > KAFKA-4344 was not a bug. The issues was as wrong initialization order > of Kafka Streams by the user. > > Please double check your initialization order (and maybe read the old > email thread and JIRA comments -- it might have some relevant > information for you to fix the issue for you). > > If the problem is still there, can you please reduce your code to a > minimum example that reproduces the problem? > > Thanks! > > - -Matthias > > On 11/5/16 3:28 PM, srinivas koniki wrote: > > > > Hi, I'm still seeing the same issue with spring boot. Code is > > below, sorry code is in groovy and not fully baked. Just have > > single processor. It worked well with single partition. But when i > > increased the partitions, started seeing the error as in this > > kafka-4344. > > > > > > import com.codahale.metrics.MetricRegistry import > > org.apache.kafka.clients.consumer.ConsumerConfig import > > org.apache.kafka.clients.producer.KafkaProducer import > > org.apache.kafka.clients.producer.ProducerRecord import > > org.apache.kafka.common.serialization.Serdes import > > org.apache.kafka.streams.KafkaStreams import > > org.apache.kafka.streams.StreamsConfig import > > org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster > > import org.apache.kafka.streams.processor.AbstractProcessor import > > org.apache.kafka.streams.processor.ProcessorSupplier import > > org.apache.kafka.streams.processor.TopologyBuilder import > > org.aspectj.lang.ProceedingJoinPoint import > > org.aspectj.lang.annotation.AfterReturning import > > org.aspectj.lang.annotation.Around import > > org.aspectj.lang.annotation.Aspect import > > org.aspectj.lang.annotation.Pointcut import > > org.springframework.beans.factory.annotation.Autowired import > > org.springframework.beans.factory.annotation.Value import > > org.springframework.boot.actuate.metrics.CounterService import > > org.springframework.boot.actuate.metrics.GaugeService import > > org.springframework.boot.autoconfigure.SpringBootApplication import > > org.springframework.boot.test.context.SpringBootTest import > > org.springframework.context.Lifecycle import > > org.springframework.context.annotation.Bean import > > org.springframework.context.annotation.Configuration import > > org.springframework.context.annotation.Import import > > org.springframework.context.support.PropertySourcesPlaceholderConfigur > er > > > > > import org.springframework.stereotype.Component > > import org.springframework.test.context.ContextConfiguration import > > org.springframework.util.StopWatch import spock.lang.Shared import > > spock.lang.Specification > > > > import java.util.concurrent.Future import > > java.util.stream.IntStream > > > > /** * Created by srinivas.koniki on 11/5/16. */ > > @ContextConfiguration(classes=[TestConfig, MetricsAspect, > > RiakService]) @SpringBootTest(webEnvironment = > > SpringBootTest.WebEnvironment.RANDOM_PORT) class MetricsSpec > > extends Specification{ > > > > static String kafkaTopic = 'testTopic' > > > > @Shared TestConfig testConfigRef > > > > @Autowired TestConfig testConfig > > > > @Autowired MetricRegistry metricRegistry > > > > @Autowired KafkaProducer kafkaProducer > > > > @Shared static final EmbeddedKafkaCluster CLUSTER = new > > EmbeddedKafkaCluster(1) > > > > def setupSpec() { println("Heavy init for all the tests...") > > CLUSTER.start() > > System.setProperty('broker.url',CLUSTER.bootstrapServers()) > > System.setProperty('zk.url',CLUSTER.zKConnectString()) > > System.setProperty('kafka.topic',kafkaTopic) > > CLUSTER.createTopic(kafkaTopic, 3, 1) } > > > > def cleanupSpec() { testConfigRef.stop() CLUSTER.stop() } > > > > def "Test send and receive" (){ expect: testConfig != null > > metricRegistry != null println ''+metricRegistry.getGauges() > > > > when: testConfigRef = testConfig testConfig.start() List > > futureList = new ArrayList<>() IntStream.rang
Re: Reopen KAFKA-4344 ?
-BEGIN PGP SIGNED MESSAGE- Hash: SHA512 KAFKA-4344 was not a bug. The issues was as wrong initialization order of Kafka Streams by the user. Please double check your initialization order (and maybe read the old email thread and JIRA comments -- it might have some relevant information for you to fix the issue for you). If the problem is still there, can you please reduce your code to a minimum example that reproduces the problem? Thanks! - -Matthias On 11/5/16 3:28 PM, srinivas koniki wrote: > > Hi, I'm still seeing the same issue with spring boot. Code is > below, sorry code is in groovy and not fully baked. Just have > single processor. It worked well with single partition. But when i > increased the partitions, started seeing the error as in this > kafka-4344. > > > import com.codahale.metrics.MetricRegistry import > org.apache.kafka.clients.consumer.ConsumerConfig import > org.apache.kafka.clients.producer.KafkaProducer import > org.apache.kafka.clients.producer.ProducerRecord import > org.apache.kafka.common.serialization.Serdes import > org.apache.kafka.streams.KafkaStreams import > org.apache.kafka.streams.StreamsConfig import > org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster > import org.apache.kafka.streams.processor.AbstractProcessor import > org.apache.kafka.streams.processor.ProcessorSupplier import > org.apache.kafka.streams.processor.TopologyBuilder import > org.aspectj.lang.ProceedingJoinPoint import > org.aspectj.lang.annotation.AfterReturning import > org.aspectj.lang.annotation.Around import > org.aspectj.lang.annotation.Aspect import > org.aspectj.lang.annotation.Pointcut import > org.springframework.beans.factory.annotation.Autowired import > org.springframework.beans.factory.annotation.Value import > org.springframework.boot.actuate.metrics.CounterService import > org.springframework.boot.actuate.metrics.GaugeService import > org.springframework.boot.autoconfigure.SpringBootApplication import > org.springframework.boot.test.context.SpringBootTest import > org.springframework.context.Lifecycle import > org.springframework.context.annotation.Bean import > org.springframework.context.annotation.Configuration import > org.springframework.context.annotation.Import import > org.springframework.context.support.PropertySourcesPlaceholderConfigur er > > import org.springframework.stereotype.Component > import org.springframework.test.context.ContextConfiguration import > org.springframework.util.StopWatch import spock.lang.Shared import > spock.lang.Specification > > import java.util.concurrent.Future import > java.util.stream.IntStream > > /** * Created by srinivas.koniki on 11/5/16. */ > @ContextConfiguration(classes=[TestConfig, MetricsAspect, > RiakService]) @SpringBootTest(webEnvironment = > SpringBootTest.WebEnvironment.RANDOM_PORT) class MetricsSpec > extends Specification{ > > static String kafkaTopic = 'testTopic' > > @Shared TestConfig testConfigRef > > @Autowired TestConfig testConfig > > @Autowired MetricRegistry metricRegistry > > @Autowired KafkaProducer kafkaProducer > > @Shared static final EmbeddedKafkaCluster CLUSTER = new > EmbeddedKafkaCluster(1) > > def setupSpec() { println("Heavy init for all the tests...") > CLUSTER.start() > System.setProperty('broker.url',CLUSTER.bootstrapServers()) > System.setProperty('zk.url',CLUSTER.zKConnectString()) > System.setProperty('kafka.topic',kafkaTopic) > CLUSTER.createTopic(kafkaTopic, 3, 1) } > > def cleanupSpec() { testConfigRef.stop() CLUSTER.stop() } > > def "Test send and receive" (){ expect: testConfig != null > metricRegistry != null println ''+metricRegistry.getGauges() > > when: testConfigRef = testConfig testConfig.start() List > futureList = new ArrayList<>() IntStream.range(1,4).forEach({ i -> > Future future = kafkaProducer.send(new ProducerRecord String>(kafkaTopic, 'test'+i, 'testMesg'+i)) }) > > futureList.forEach({ future -> println future.get() }) then: > Thread.sleep(2) > > println ''+metricRegistry.getGauges() println > ''+metricRegistry.counters > metricRegistry.counters.keySet().forEach({key -> println > key+':'+metricRegistry.counters.get(key).count }) > Thread.sleep(2000) } > > @Configuration @SpringBootApplication static class TestConfig > implements Lifecycle { > > @Value('${broker.url}') String brokerUrl > > Map producerConfig(){ def props = > ["bootstrap.servers" : brokerUrl, "acks" : "all", "retries": 0, > "batch.size": 16384, "linger.ms": 1, "buffer.memory" : 33554432, > "key.serializer": > "org.apache.kafka.common.serialization.StringSerializer", > "value.serializer" : > "org.apache.kafka.common.serialization.StringSerializer" ] } > > @Bean KafkaProducer kafkaProducer() { new > KafkaProducer(producerConfig()) } > > @Bean public static PropertySourcesPlaceholderConfigurer > properties() { return new PropertySourcesPlaceholderConfigurer() } > > @Value('${zk.url}') String zkUrl > > @Value('${kafka.topic}') String kafkaTopic > > @Autowire
Reopen KAFKA-4344 ?
Hi, I'm still seeing the same issue with spring boot. Code is below, sorry code is in groovy and not fully baked. Just have single processor. It worked well with single partition. But when i increased the partitions, started seeing the error as in this kafka-4344. import com.codahale.metrics.MetricRegistry import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.serialization.Serdes import org.apache.kafka.streams.KafkaStreams import org.apache.kafka.streams.StreamsConfig import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster import org.apache.kafka.streams.processor.AbstractProcessor import org.apache.kafka.streams.processor.ProcessorSupplier import org.apache.kafka.streams.processor.TopologyBuilder import org.aspectj.lang.ProceedingJoinPoint import org.aspectj.lang.annotation.AfterReturning import org.aspectj.lang.annotation.Around import org.aspectj.lang.annotation.Aspect import org.aspectj.lang.annotation.Pointcut import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Value import org.springframework.boot.actuate.metrics.CounterService import org.springframework.boot.actuate.metrics.GaugeService import org.springframework.boot.autoconfigure.SpringBootApplication import org.springframework.boot.test.context.SpringBootTest import org.springframework.context.Lifecycle import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.context.annotation.Import import org.springframework.context.support.PropertySourcesPlaceholderConfigurer import org.springframework.stereotype.Component import org.springframework.test.context.ContextConfiguration import org.springframework.util.StopWatch import spock.lang.Shared import spock.lang.Specification import java.util.concurrent.Future import java.util.stream.IntStream /** * Created by srinivas.koniki on 11/5/16. */ @ContextConfiguration(classes=[TestConfig, MetricsAspect, RiakService]) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) class MetricsSpec extends Specification{ static String kafkaTopic = 'testTopic' @Shared TestConfig testConfigRef @Autowired TestConfig testConfig @Autowired MetricRegistry metricRegistry @Autowired KafkaProducer kafkaProducer @Shared static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1) def setupSpec() { println("Heavy init for all the tests...") CLUSTER.start() System.setProperty('broker.url',CLUSTER.bootstrapServers()) System.setProperty('zk.url',CLUSTER.zKConnectString()) System.setProperty('kafka.topic',kafkaTopic) CLUSTER.createTopic(kafkaTopic, 3, 1) } def cleanupSpec() { testConfigRef.stop() CLUSTER.stop() } def "Test send and receive" (){ expect: testConfig != null metricRegistry != null println ''+metricRegistry.getGauges() when: testConfigRef = testConfig testConfig.start() List futureList = new ArrayList<>() IntStream.range(1,4).forEach({ i -> Future future = kafkaProducer.send(new ProducerRecord(kafkaTopic, 'test'+i, 'testMesg'+i)) }) futureList.forEach({ future -> println future.get() }) then: Thread.sleep(2) println ''+metricRegistry.getGauges() println ''+metricRegistry.counters metricRegistry.counters.keySet().forEach({key -> println key+':'+metricRegistry.counters.get(key).count }) Thread.sleep(2000) } @Configuration @SpringBootApplication static class TestConfig implements Lifecycle { @Value('${broker.url}') String brokerUrl Map producerConfig(){ def props = ["bootstrap.servers" : brokerUrl, "acks" : "all", "retries": 0, "batch.size": 16384, "linger.ms": 1, "buffer.memory" : 33554432, "key.serializer": "org.apache.kafka.common.serialization.StringSerializer", "value.serializer" : "org.apache.kafka.common.serialization.StringSerializer" ] } @Bean KafkaProducer kafkaProducer() { new KafkaProducer(producerConfig()) } @Bean public static PropertySourcesPlaceholderConfigurer properties() { return new PropertySourcesPlaceholderConfigurer() } @Value('${zk.url}') String zkUrl @Value('${kafka.topic}') String kafkaTopic @Autowired RiakService riakService @Autowired CounterService counterService KafkaStreams streams boolean state @Override void