Re: Reopen KAFKA-4344 ?

2016-11-07 Thread saiprasad mishra
Hi Srinivas

I raised the issue and the way I got around this was to let kafka streams
run on POJO way rather than some of the dependent instances being spring
managed bean instances.
If you create the instance of riakService and counterService in processor
class instead of passing the spring managed instances to the processor
class constructor your kafka streams initilization should be fine and it
should create the right number of tasks with right number of processors for
all the partitions.

I was fine with POJO based approach as kafka streams has quite a bit of
apis to query the state(of course once it is started correctly) as i am
running stateful processors and i wanted to query the state data all the
time. I was just using spring boot controller for the web container to
proxy the kafka streams state store(ReadOnlyKeyValueStore) get apis.

Alternatively you can try having prototype components for these two
services (if your usecase is fine with this).

Hope this helps.


On Mon, Nov 7, 2016 at 9:08 AM, Matthias J. Sax 

> Hash: SHA512
> KAFKA-4344 was not a bug. The issues was as wrong initialization order
> of Kafka Streams by the user.
> Please double check your initialization order (and maybe read the old
> email thread and JIRA comments -- it might have some relevant
> information for you to fix the issue for you).
> If the problem is still there, can you please reduce your code to a
> minimum example that reproduces the problem?
> Thanks!
> - -Matthias
> On 11/5/16 3:28 PM, srinivas koniki wrote:
> >
> > Hi, I'm still seeing the same issue with spring boot. Code is
> > below, sorry code is in groovy and not fully baked. Just have
> > single processor. It worked well with single partition. But when i
> > increased the partitions, started seeing the error as in this
> > kafka-4344.
> >
> >
> > import com.codahale.metrics.MetricRegistry import
> > org.apache.kafka.clients.consumer.ConsumerConfig import
> > org.apache.kafka.clients.producer.KafkaProducer import
> > org.apache.kafka.clients.producer.ProducerRecord import
> > org.apache.kafka.common.serialization.Serdes import
> > org.apache.kafka.streams.KafkaStreams import
> > org.apache.kafka.streams.StreamsConfig import
> > org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster
> > import org.apache.kafka.streams.processor.AbstractProcessor import
> > org.apache.kafka.streams.processor.ProcessorSupplier import
> > org.apache.kafka.streams.processor.TopologyBuilder import
> > org.aspectj.lang.ProceedingJoinPoint import
> > org.aspectj.lang.annotation.AfterReturning import
> > org.aspectj.lang.annotation.Around import
> > org.aspectj.lang.annotation.Aspect import
> > org.aspectj.lang.annotation.Pointcut import
> > org.springframework.beans.factory.annotation.Autowired import
> > org.springframework.beans.factory.annotation.Value import
> > org.springframework.boot.actuate.metrics.CounterService import
> > org.springframework.boot.actuate.metrics.GaugeService import
> > org.springframework.boot.autoconfigure.SpringBootApplication import
> > org.springframework.boot.test.context.SpringBootTest import
> > org.springframework.context.Lifecycle import
> > org.springframework.context.annotation.Bean import
> > org.springframework.context.annotation.Configuration import
> > org.springframework.context.annotation.Import import
> >
> er
> >
> >
> import org.springframework.stereotype.Component
> > import org.springframework.test.context.ContextConfiguration import
> > org.springframework.util.StopWatch import spock.lang.Shared import
> > spock.lang.Specification
> >
> > import java.util.concurrent.Future import
> >
> >
> > /** * Created by srinivas.koniki on 11/5/16. */
> > @ContextConfiguration(classes=[TestConfig, MetricsAspect,
> > RiakService]) @SpringBootTest(webEnvironment =
> > SpringBootTest.WebEnvironment.RANDOM_PORT) class MetricsSpec
> > extends Specification{
> >
> > static String kafkaTopic = 'testTopic'
> >
> > @Shared TestConfig testConfigRef
> >
> > @Autowired TestConfig testConfig
> >
> > @Autowired MetricRegistry metricRegistry
> >
> > @Autowired KafkaProducer kafkaProducer
> >
> > @Shared static final EmbeddedKafkaCluster CLUSTER = new
> > EmbeddedKafkaCluster(1)
> >
> > def setupSpec() { println("Heavy init for all the tests...")
> > CLUSTER.start()
> > System.setProperty('broker.url',CLUSTER.bootstrapServers())
> > System.setProperty('zk.url',CLUSTER.zKConnectString())
> > System.setProperty('kafka.topic',kafkaTopic)
> > CLUSTER.createTopic(kafkaTopic, 3, 1) }
> >
> > def cleanupSpec() { testConfigRef.stop() CLUSTER.stop() }
> >
> > def "Test send and receive" (){ expect: testConfig != null
> > metricRegistry != null println ''+metricRegistry.getGauges()
> >
> > when: testConfigRef = testConfig testConfig.start() List
> > futureList = new ArrayList<>() IntStream.rang

Re: Reopen KAFKA-4344 ?

2016-11-07 Thread Matthias J. Sax
Hash: SHA512

KAFKA-4344 was not a bug. The issues was as wrong initialization order
of Kafka Streams by the user.

Please double check your initialization order (and maybe read the old
email thread and JIRA comments -- it might have some relevant
information for you to fix the issue for you).

If the problem is still there, can you please reduce your code to a
minimum example that reproduces the problem?


- -Matthias

On 11/5/16 3:28 PM, srinivas koniki wrote:
> Hi, I'm still seeing the same issue with spring boot. Code is
> below, sorry code is in groovy and not fully baked. Just have
> single processor. It worked well with single partition. But when i
> increased the partitions, started seeing the error as in this
> kafka-4344.
> import com.codahale.metrics.MetricRegistry import
> org.apache.kafka.clients.consumer.ConsumerConfig import
> org.apache.kafka.clients.producer.KafkaProducer import
> org.apache.kafka.clients.producer.ProducerRecord import
> org.apache.kafka.common.serialization.Serdes import
> org.apache.kafka.streams.KafkaStreams import
> org.apache.kafka.streams.StreamsConfig import
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster 
> import org.apache.kafka.streams.processor.AbstractProcessor import
> org.apache.kafka.streams.processor.ProcessorSupplier import
> org.apache.kafka.streams.processor.TopologyBuilder import
> org.aspectj.lang.ProceedingJoinPoint import
> org.aspectj.lang.annotation.AfterReturning import
> org.aspectj.lang.annotation.Around import
> org.aspectj.lang.annotation.Aspect import
> org.aspectj.lang.annotation.Pointcut import
> org.springframework.beans.factory.annotation.Autowired import
> org.springframework.beans.factory.annotation.Value import
> org.springframework.boot.actuate.metrics.CounterService import
> org.springframework.boot.actuate.metrics.GaugeService import
> org.springframework.boot.autoconfigure.SpringBootApplication import
> org.springframework.boot.test.context.SpringBootTest import
> org.springframework.context.Lifecycle import
> org.springframework.context.annotation.Bean import
> org.springframework.context.annotation.Configuration import
> org.springframework.context.annotation.Import import
import org.springframework.stereotype.Component
> import org.springframework.test.context.ContextConfiguration import
> org.springframework.util.StopWatch import spock.lang.Shared import
> spock.lang.Specification
> import java.util.concurrent.Future import
> /** * Created by srinivas.koniki on 11/5/16. */ 
> @ContextConfiguration(classes=[TestConfig, MetricsAspect,
> RiakService]) @SpringBootTest(webEnvironment =
> SpringBootTest.WebEnvironment.RANDOM_PORT) class MetricsSpec
> extends Specification{
> static String kafkaTopic = 'testTopic'
> @Shared TestConfig testConfigRef
> @Autowired TestConfig testConfig
> @Autowired MetricRegistry metricRegistry
> @Autowired KafkaProducer kafkaProducer
> @Shared static final EmbeddedKafkaCluster CLUSTER = new
> EmbeddedKafkaCluster(1)
> def setupSpec() { println("Heavy init for all the tests...") 
> CLUSTER.start() 
> System.setProperty('broker.url',CLUSTER.bootstrapServers()) 
> System.setProperty('zk.url',CLUSTER.zKConnectString()) 
> System.setProperty('kafka.topic',kafkaTopic) 
> CLUSTER.createTopic(kafkaTopic, 3, 1) }
> def cleanupSpec() { testConfigRef.stop() CLUSTER.stop() }
> def "Test send and receive" (){ expect: testConfig != null 
> metricRegistry != null println ''+metricRegistry.getGauges()
> when: testConfigRef = testConfig testConfig.start() List
> futureList = new ArrayList<>() IntStream.range(1,4).forEach({ i -> 
> Future future = kafkaProducer.send(new ProducerRecord String>(kafkaTopic, 'test'+i, 'testMesg'+i)) })
> futureList.forEach({ future -> println future.get() }) then: 
> Thread.sleep(2)
> println ''+metricRegistry.getGauges() println
> ''+metricRegistry.counters 
> metricRegistry.counters.keySet().forEach({key -> println
> key+':'+metricRegistry.counters.get(key).count }) 
> Thread.sleep(2000) }
> @Configuration @SpringBootApplication static class TestConfig
> implements Lifecycle {
> @Value('${broker.url}') String brokerUrl
> Map producerConfig(){ def props =
> ["bootstrap.servers" : brokerUrl, "acks" : "all", "retries": 0,
> "batch.size": 16384, "": 1, "buffer.memory" : 33554432, 
> "key.serializer":
> "org.apache.kafka.common.serialization.StringSerializer", 
> "value.serializer" :
> "org.apache.kafka.common.serialization.StringSerializer" ] }
> @Bean KafkaProducer kafkaProducer() { new
> KafkaProducer(producerConfig()) }
> @Bean public static PropertySourcesPlaceholderConfigurer
> properties() { return new PropertySourcesPlaceholderConfigurer() }
> @Value('${zk.url}') String zkUrl
> @Value('${kafka.topic}') String kafkaTopic
> @Autowire

Reopen KAFKA-4344 ?

2016-11-07 Thread srinivas koniki

I'm still seeing the same issue with spring boot. Code is below, sorry code is 
in groovy and not fully baked. Just have single processor. It worked well with 
single partition. But when i increased the partitions, started seeing the error 
as in this kafka-4344.

import com.codahale.metrics.MetricRegistry
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster
import org.apache.kafka.streams.processor.AbstractProcessor
import org.apache.kafka.streams.processor.ProcessorSupplier
import org.apache.kafka.streams.processor.TopologyBuilder
import org.aspectj.lang.ProceedingJoinPoint
import org.aspectj.lang.annotation.AfterReturning
import org.aspectj.lang.annotation.Around
import org.aspectj.lang.annotation.Aspect
import org.aspectj.lang.annotation.Pointcut
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.actuate.metrics.CounterService
import org.springframework.boot.actuate.metrics.GaugeService
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.context.Lifecycle
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Import
import org.springframework.stereotype.Component
import org.springframework.test.context.ContextConfiguration
import org.springframework.util.StopWatch
import spock.lang.Shared
import spock.lang.Specification

import java.util.concurrent.Future

 * Created by srinivas.koniki on 11/5/16.
@ContextConfiguration(classes=[TestConfig, MetricsAspect, RiakService])
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class MetricsSpec extends Specification{

static String kafkaTopic = 'testTopic'

TestConfig testConfigRef

TestConfig testConfig

MetricRegistry metricRegistry

KafkaProducer kafkaProducer

static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1)

def setupSpec() {
println("Heavy init for all the tests...")
CLUSTER.createTopic(kafkaTopic, 3, 1)

def cleanupSpec() {

def "Test send and receive" (){
testConfig != null
metricRegistry != null
println ''+metricRegistry.getGauges()

testConfigRef = testConfig
List futureList = new ArrayList<>()
IntStream.range(1,4).forEach({ i ->
Future future = kafkaProducer.send(new ProducerRecord(kafkaTopic, 'test'+i, 'testMesg'+i))

futureList.forEach({ future ->
   println future.get()

println ''+metricRegistry.getGauges()
println ''+metricRegistry.counters
metricRegistry.counters.keySet().forEach({key ->
println key+':'+metricRegistry.counters.get(key).count

static class TestConfig implements Lifecycle {

String brokerUrl

Map producerConfig(){
def props = ["bootstrap.servers" : brokerUrl, "acks" : "all", 
"retries": 0, "batch.size": 16384, "": 1,
 "buffer.memory" : 33554432,
 "value.serializer" : 

KafkaProducer kafkaProducer() {
new KafkaProducer(producerConfig())

public static PropertySourcesPlaceholderConfigurer properties() {
return new PropertySourcesPlaceholderConfigurer()

String zkUrl

String kafkaTopic

RiakService riakService

CounterService counterService

KafkaStreams streams

boolean state
