Yep. pretty much sure. Via internal kafka-producer.sh same method is used to produce initial messages (before first launch of topology, that got consumed and processed just fine)
as for maxSpoutPending first I tried with 10, than removed it (left default value) On Tue, Jul 8, 2014 at 6:31 PM, Danijel Schiavuzzi <dani...@schiavuzzi.com> wrote: > Are you sure you are producing new messages into the same Kafka > topic? What number did you set maxSpoutPending to? > > On Tuesday, July 8, 2014, Miloš Solujić <milos.solu...@gmail.com> wrote: > >> Thanks Danijel for your quick proposition. >> >> I tried lowering down and removing all performance settings (those were >> left from load testing on one machine) >> >> Still same result: no matter what, new messages are not taken from kafka >> after topology is redeployed. >> >> >> On Tue, Jul 8, 2014 at 6:15 PM, Danijel Schiavuzzi < >> dani...@schiavuzzi.com> wrote: >> >>> Try lowering setMaxSpoutPending(100000) to a much lower value (like 10). >>> In Trident, setMaxSpoutPending referns to the number of batches, not tuples >>> like in plain Storm. Too high values may cause blockages like the one you >>> describe. >>> >>> >>> On Tuesday, July 8, 2014, Miloš Solujić <milos.solu...@gmail.com> wrote: >>> >>>> Hi all, >>>> >>>> I'm pretty new to storm and kafka/zookeeper, and I hope that my >>>> question is not to dumb. Here it goes: >>>> >>>> I'm using latest stable storm and storm-kafka = 0.9.2-incubating >>>> >>>> I've setup test cluster using wirbelsturm tool with unchanged yaml >>>> (just uncommented kafka machine) >>>> >>>> here is config snippet for my trident topology: >>>> >>>> BrokerHosts zk = new ZkHosts("zookeeper1"); >>>> TridentKafkaConfig kafkaConf = new TridentKafkaConfig(zk, >>>> "scores"); >>>> >>>> kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme()); >>>> kafkaConf.fetchSizeBytes = 10000; >>>> kafkaConf.forceFromStart = true; >>>> >>>> Config stormConfig = new Config(); >>>> stormConfig.put(Config.NIMBUS_HOST, NIMBUS_IP); >>>> stormConfig.put("couchbase.ip", COUCHBASE_CONSOLE); >>>> stormConfig.put("couchbase.bucket", COUCHBASE_BUCKET); >>>> stormConfig.put("couchbase.password", COUCHBASE_PASSWORD); >>>> // performance settings >>>> >>>> stormConfig.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 100); >>>> stormConfig.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, >>>> 100); >>>> stormConfig.setMaxSpoutPending(100000); >>>> >>>> >>>> if (args != null && args.length > 0) { >>>> >>>> StormSubmitter.submitTopologyWithProgressBar(args[0], >>>> stormConfig, >>>> BuildTridentScoreTopology.build(kafkaConf)); >>>> } else {...} >>>> >>>> Now, I've created 'scores' topic in kafka and pushed few test messages >>>> prior to starting topology, with kafkaConf.forceFromStart = true. And >>>> topology processed those messages just fine, and stored them in >>>> tridentState (couhbase) >>>> >>>> All new messages are simply ignored! >>>> >>>> After redeploying topology (both with forceFromStart = true and >>>> forceFromStart = false) no more messages are ingested from kafka. >>>> >>>> here is worker log for one topology deployment and short run >>>> http://pastie.org/private/4xsk6pijvmulwrcg7zgca >>>> >>>> those are VMs that host this storm cluster >>>> 10.0.0.241 zookeeper1 >>>> 10.0.0.101 supervisor1 >>>> 10.0.0.21 kafka1 >>>> 10.0.0.251 nimbus1 >>>> >>>> Thanks, >>>> Milos >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>> >>> -- >>> Danijel Schiavuzzi >>> >>> E: dani...@schiavuzzi.com >>> W: www.schiavuzzi.com >>> T: +385989035562 >>> Skype: danijels7 >>> >> >> > > -- > Danijel Schiavuzzi > > E: dani...@schiavuzzi.com > W: www.schiavuzzi.com > T: +385989035562 > Skype: danijels7 >