Yep. pretty much sure. Via internal kafka-producer.sh
same method is used to produce initial messages (before first launch of
topology, that got consumed and processed just fine)

as for maxSpoutPending first I tried with 10, than removed it (left default
value)


On Tue, Jul 8, 2014 at 6:31 PM, Danijel Schiavuzzi <dani...@schiavuzzi.com>
wrote:

> Are you sure you are producing new messages into the same Kafka
> topic? What number did you set maxSpoutPending to?
>
> On Tuesday, July 8, 2014, Miloš Solujić <milos.solu...@gmail.com> wrote:
>
>> Thanks Danijel for your quick proposition.
>>
>> I tried lowering down and removing all performance settings (those were
>> left from load testing on one machine)
>>
>> Still same result: no matter what, new messages are not taken from kafka
>> after topology is redeployed.
>>
>>
>> On Tue, Jul 8, 2014 at 6:15 PM, Danijel Schiavuzzi <
>> dani...@schiavuzzi.com> wrote:
>>
>>> Try lowering setMaxSpoutPending(100000) to a much lower value (like 10).
>>> In Trident, setMaxSpoutPending referns to the number of batches, not tuples
>>> like in plain Storm. Too high values may cause blockages like the one you
>>> describe.
>>>
>>>
>>> On Tuesday, July 8, 2014, Miloš Solujić <milos.solu...@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I'm pretty new to storm and kafka/zookeeper, and I hope that my
>>>> question is not to dumb. Here it goes:
>>>>
>>>> I'm using latest stable storm and storm-kafka = 0.9.2-incubating
>>>>
>>>> I've setup test cluster using wirbelsturm tool with unchanged yaml
>>>> (just uncommented kafka machine)
>>>>
>>>> here is config snippet for my trident topology:
>>>>
>>>>         BrokerHosts zk = new ZkHosts("zookeeper1");
>>>>         TridentKafkaConfig kafkaConf = new TridentKafkaConfig(zk,
>>>> "scores");
>>>>
>>>>         kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
>>>>         kafkaConf.fetchSizeBytes = 10000;
>>>>         kafkaConf.forceFromStart = true;
>>>>
>>>>         Config stormConfig = new Config();
>>>>         stormConfig.put(Config.NIMBUS_HOST, NIMBUS_IP);
>>>>         stormConfig.put("couchbase.ip", COUCHBASE_CONSOLE);
>>>>         stormConfig.put("couchbase.bucket", COUCHBASE_BUCKET);
>>>>         stormConfig.put("couchbase.password", COUCHBASE_PASSWORD);
>>>>         // performance settings
>>>>
>>>> stormConfig.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 100);
>>>>         stormConfig.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF,
>>>> 100);
>>>>         stormConfig.setMaxSpoutPending(100000);
>>>>
>>>>
>>>>         if (args != null && args.length > 0) {
>>>>
>>>>             StormSubmitter.submitTopologyWithProgressBar(args[0],
>>>> stormConfig,
>>>>                     BuildTridentScoreTopology.build(kafkaConf));
>>>>         } else {...}
>>>>
>>>> Now, I've created 'scores' topic in kafka and pushed few test messages
>>>> prior to starting topology, with kafkaConf.forceFromStart = true. And
>>>> topology processed those messages just fine, and stored them in
>>>> tridentState (couhbase)
>>>>
>>>> All new messages are simply ignored!
>>>>
>>>> After redeploying topology (both with forceFromStart = true and
>>>> forceFromStart = false) no more messages are ingested from kafka.
>>>>
>>>> here is worker log for one topology deployment and short run
>>>> http://pastie.org/private/4xsk6pijvmulwrcg7zgca
>>>>
>>>> those are VMs that host this storm cluster
>>>> 10.0.0.241 zookeeper1
>>>> 10.0.0.101 supervisor1
>>>> 10.0.0.21 kafka1
>>>> 10.0.0.251 nimbus1
>>>>
>>>> Thanks,
>>>> Milos
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>> --
>>> Danijel Schiavuzzi
>>>
>>> E: dani...@schiavuzzi.com
>>> W: www.schiavuzzi.com
>>> T: +385989035562
>>> Skype: danijels7
>>>
>>
>>
>
> --
> Danijel Schiavuzzi
>
> E: dani...@schiavuzzi.com
> W: www.schiavuzzi.com
> T: +385989035562
> Skype: danijels7
>

Reply via email to