Hi, Setting max spout pending value very high causes this. Try setting with low value say 1. My topology works good with 10 as max spout pending. More info here http://storm.apache.org/releases/1.1.2/FAQ.html
Thanks, Shaik Asifullah On Tue, 20 Nov 2018, 07:23 731635762 <mrche...@qq.com wrote: > Hi All, I use storm+kafka+protobuf to build my stream process system. > > The problem is KafkaTridentSpoutOpaque Repeated consumption the last > message. I really want just once consumer for every message in kafka. > > This are my some detail: > ---------------------java dependency---------- > storm-kafka-client 1.2.2 > storm-core 1.2.2 > kafka_2.10 0.10.2.0 > > --------------------component----------------- > kafka_2.12-2.0.0 > apache-storm-1.2.2 > > ------------------------------------- build KafkaTridentSpoutOpaque > instance code ---------------------- > > protected static KafkaSpoutConfig<String, byte[]> > newKafkaSpoutConfig(String bootstrapServers, String topic) { > KafkaSpoutConfig.Builder<String, byte[]> builder = new > KafkaSpoutConfig.Builder<>(bootstrapServers, topic); > > return builder.setProp(ConsumerConfig.GROUP_ID_CONFIG, > "stormKafkaSpoutGroup") > .setProp(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000") > .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringDeserializer") > .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.ByteArrayDeserializer") > .setRecordTranslator(new JustValueFunc(), new > Fields("str")) > .setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST) > .setProcessingGuarantee(AT_MOST_ONCE) > .build(); > } > > private static KafkaTridentSpoutOpaque<String, byte[]> > newKafkaTridentSpoutOpaque(KafkaSpoutConfig<String, byte[]> spoutConfig) { > return new KafkaTridentSpoutOpaque<>(spoutConfig); > } > > private static class JustValueFunc implements > Func<ConsumerRecord<String, byte[]>, List<Object>>, Serializable { > @Override > public List<Object> apply(ConsumerRecord<String, byte[]> record) { > Values res = null; > try { > res = new > Values(PbMiddlewareTransfer.Record.parseFrom(record.value())); > } catch (InvalidProtocolBufferException e) { > e.printStackTrace(); > } > return res; > } > } > > -------------------------------there is my topology code > --------------------------------------- > public static void main(String[] args) throws Exception { > StormTopology topology = getTridentTopology(); > Config conf = new Config(); > conf.setNumWorkers(20); > conf.setMaxSpoutPending(5000); > StormSubmitter.submitTopology("storm-kafka-client-spout-test", > conf, topology); > } > > public static StormTopology getTridentTopology() { > final TridentTopology tridentTopology = new TridentTopology(); > > KafkaSpoutConfig<String, byte[]> spoutConfig = > newKafkaSpoutConfig("192.168.0.202:9092", "test-2"); > ITridentDataSource spout = newKafkaTridentSpoutOpaque(spoutConfig); > > final Stream spoutStream = tridentTopology.newStream("spout", > spout).parallelismHint(1); > > spoutStream.each(spoutStream.getOutputFields(), new Debug("##### > fastest driver")); > > return tridentTopology.build(); > } > > > ------------------------------log------------------------- > ./6702/worker.log:2018-11-19 20:19:12.418 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:19:12 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:19:25.908 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:19:25 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:20:01.997 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:20:01 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:20:30.591 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:20:30 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:20:42.960 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:20:42 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:20:44.477 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:20:44 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:20:47.501 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:20:47 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:20:48.516 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:20:48 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:20:54.072 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:20:54 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:22:01.171 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:22:01 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:22:27.380 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:22:27 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:23:03.992 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:23:03 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:23:14.893 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:23:14 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:23:20.955 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:23:20 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:23:25.495 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:23:25 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:23:47.978 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:23:47 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:23:56.440 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:23:56 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:24:33.534 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:24:33 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:27:35.588 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:27:35 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:28:23.784 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:28:23 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:28:48.155 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:28:48 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:29:12.218 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:29:12 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:31:15.597 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:31:15 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:31:30.720 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:31:30 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:33:07.871 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:33:07 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:33:27.889 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:33:27 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:34:34.126 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:34:34 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:35:36.615 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:35:36 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:39:31.282 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:39:31 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:40:15.364 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:40:15 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:41:15.565 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:41:15 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:41:16.570 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:41:16 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:41:54.130 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:41:54 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:43:30.303 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:43:30 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:44:26.049 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:44:26 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:52:43.618 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:52:43 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:54:01.904 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:54:01 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:55:13.448 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:55:13 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > ./6702/worker.log:2018-11-19 20:59:15.220 STDIO Thread-9-b-0-executor[23 > 23] [INFO] <Mon Nov 19 20:59:15 CST 2018> DEBUG(##### fastest driver): > [metricID: 1 > > I just produce one message put in kafka, above message is my topology > output message. it is except just one, but there are many. And it will > repeted about half hour. > > Any help is appreciated. > > Thanks, > ChenBo > >