[ https://issues.apache.org/jira/browse/STORM-2675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Stig Rohde Døssing reassigned STORM-2675: ----------------------------------------- Assignee: Stig Rohde Døssing > KafkaTridentSpoutOpaque not committing offsets to Kafka > ------------------------------------------------------- > > Key: STORM-2675 > URL: https://issues.apache.org/jira/browse/STORM-2675 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client > Affects Versions: 1.1.0 > Reporter: Preet Puri > Assignee: Stig Rohde Døssing > > Every time I restart the topology the spout was picking the earliest message > even though poll strategy is set UNCOMMITTED_EARLIEST. I looked at Kafka's > __consumer_offsets topic to see if spout (consumer) is committing the offsets > but did not find any commits. I am not even able to locate the code in the > KafkaTridentSpoutEmitter class where we are updating the commits? > conf.put(Config.TOPOLOGY_DEBUG, true); > conf.put(Config.TOPOLOGY_WORKERS, 1); > conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 4); //tried with1 as well > conf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, "/aggregate"); > conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList(new > String[]{"localhost"})); > conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, 2181); > protected static KafkaSpoutConfig<String, String> > getPMStatKafkaSpoutConfig() { > ByTopicRecordTranslator<String, String> byTopic = > new ByTopicRecordTranslator<>((r) -> new Values(r.topic(), r.key(), > r.value()), > new Fields(TOPIC, PARTITION_KEY, PAYLOAD), SENSOR_STREAM); > return new KafkaSpoutConfig.Builder<String, > String>(Utils.getBrokerHosts(), > StringDeserializer.class, null, Utils.getKafkaEnrichedPMSTopicName()) > .setMaxPartitionFectchBytes(10 * 1024) // 10 KB > .setRetry(getRetryService()) > .setOffsetCommitPeriodMs(10_000) > > .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST) > .setMaxUncommittedOffsets(250) > .setProp("value.deserializer", > "io.confluent.kafka.serializers.KafkaAvroDeserializer") > .setProp("schema.registry.url","http://localhost:8081") > .setProp("specific.avro.reader",true) > .setGroupId(AGGREGATION_CONSUMER_GROUP) > .setRecordTranslator(byTopic).build(); > } > Stream pmStatStream = > topology.newStream("statStream", new > KafkaTridentSpoutOpaque<>(getPMStatKafkaSpoutConfig())).parallelismHint(1) > storm-version - 1.1.0 -- This message was sent by Atlassian JIRA (v6.4.14#64029)