Hi,
Setting max spout pending value very high causes this. Try setting with low
value say 1. My topology works good with 10 as max spout pending. More info
here http://storm.apache.org/releases/1.1.2/FAQ.html

Thanks,
Shaik Asifullah



On Tue, 20 Nov 2018, 07:23 731635762 <mrche...@qq.com wrote:

> Hi All, I use storm+kafka+protobuf to build my stream process system.
>
> The problem is KafkaTridentSpoutOpaque Repeated consumption the last
> message. I really want  just once consumer for every message in kafka.
>
> This are my some detail:
> ---------------------java dependency----------
>         storm-kafka-client    1.2.2
>         storm-core    1.2.2
>         kafka_2.10    0.10.2.0
>
> --------------------component-----------------
> kafka_2.12-2.0.0
> apache-storm-1.2.2
>
> ------------------------------------- build KafkaTridentSpoutOpaque
> instance code ----------------------
>
> protected static KafkaSpoutConfig<String, byte[]>
> newKafkaSpoutConfig(String bootstrapServers, String topic) {
>         KafkaSpoutConfig.Builder<String, byte[]> builder = new
> KafkaSpoutConfig.Builder<>(bootstrapServers, topic);
>
>         return builder.setProp(ConsumerConfig.GROUP_ID_CONFIG,
> "stormKafkaSpoutGroup")
>                 .setProp(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
>                 .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> "org.apache.kafka.common.serialization.StringDeserializer")
>                 .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> "org.apache.kafka.common.serialization.ByteArrayDeserializer")
>                 .setRecordTranslator(new JustValueFunc(), new
> Fields("str"))
>                 .setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
>                 .setProcessingGuarantee(AT_MOST_ONCE)
>                 .build();
>     }
>
>     private static KafkaTridentSpoutOpaque<String, byte[]>
> newKafkaTridentSpoutOpaque(KafkaSpoutConfig<String, byte[]> spoutConfig) {
>         return new KafkaTridentSpoutOpaque<>(spoutConfig);
>     }
>
>     private static class JustValueFunc implements
> Func<ConsumerRecord<String, byte[]>, List<Object>>, Serializable {
>         @Override
>         public List<Object> apply(ConsumerRecord<String, byte[]> record) {
>             Values res = null;
>             try {
>                 res = new
> Values(PbMiddlewareTransfer.Record.parseFrom(record.value()));
>             } catch (InvalidProtocolBufferException e) {
>                 e.printStackTrace();
>             }
>             return res;
>         }
>     }
>
> -------------------------------there is my topology code
> ---------------------------------------
> public static void main(String[] args) throws Exception {
>         StormTopology topology = getTridentTopology();
>         Config conf = new Config();
>         conf.setNumWorkers(20);
>         conf.setMaxSpoutPending(5000);
>         StormSubmitter.submitTopology("storm-kafka-client-spout-test",
> conf, topology);
>     }
>
>     public static StormTopology getTridentTopology() {
>         final TridentTopology tridentTopology = new TridentTopology();
>
>         KafkaSpoutConfig<String, byte[]> spoutConfig =
> newKafkaSpoutConfig("192.168.0.202:9092", "test-2");
>         ITridentDataSource spout = newKafkaTridentSpoutOpaque(spoutConfig);
>
>         final Stream spoutStream = tridentTopology.newStream("spout",
> spout).parallelismHint(1);
>
>         spoutStream.each(spoutStream.getOutputFields(), new Debug("#####
> fastest driver"));
>
>         return tridentTopology.build();
>     }
>
>
> ------------------------------log-------------------------
> ./6702/worker.log:2018-11-19 20:19:12.418 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:19:12 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:19:25.908 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:19:25 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:20:01.997 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:20:01 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:20:30.591 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:20:30 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:20:42.960 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:20:42 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:20:44.477 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:20:44 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:20:47.501 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:20:47 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:20:48.516 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:20:48 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:20:54.072 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:20:54 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:22:01.171 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:22:01 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:22:27.380 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:22:27 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:23:03.992 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:23:03 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:23:14.893 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:23:14 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:23:20.955 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:23:20 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:23:25.495 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:23:25 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:23:47.978 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:23:47 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:23:56.440 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:23:56 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:24:33.534 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:24:33 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:27:35.588 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:27:35 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:28:23.784 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:28:23 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:28:48.155 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:28:48 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:29:12.218 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:29:12 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:31:15.597 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:31:15 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:31:30.720 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:31:30 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:33:07.871 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:33:07 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:33:27.889 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:33:27 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:34:34.126 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:34:34 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:35:36.615 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:35:36 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:39:31.282 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:39:31 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:40:15.364 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:40:15 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:41:15.565 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:41:15 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:41:16.570 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:41:16 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:41:54.130 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:41:54 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:43:30.303 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:43:30 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:44:26.049 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:44:26 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:52:43.618 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:52:43 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:54:01.904 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:54:01 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:55:13.448 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:55:13 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
> ./6702/worker.log:2018-11-19 20:59:15.220 STDIO Thread-9-b-0-executor[23
> 23] [INFO] <Mon Nov 19 20:59:15 CST 2018> DEBUG(##### fastest driver):
> [metricID: 1
>
> I just produce one message put in kafka, above message is my topology
> output message. it is except just one, but there are many. And it will
> repeted about half hour.
>
> Any help is appreciated.
>
> Thanks,
> ChenBo
>
>

Reply via email to