Hello,
    Working with kafka 0.10.1.0, I used these config

val props = new Properties

props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)

props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, broker)

props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass)

props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Integer.getClass)

props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
 "latest")







but these code 
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
 "latest")


does not work, what is the reason?



I read the code of org.apache.kafka.streams.StreamsConfig, there has some code:



private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES;

static

{

    Map<String, Object> tempConsumerDefaultOverrides = new HashMap<>();

    tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
"1000");

    tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");

    tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false");




    CONSUMER_DEFAULT_OVERRIDES = 
Collections.unmodifiableMap(tempConsumerDefaultOverrides);
} 



public Map<String, Object> getConsumerConfigs(StreamThread streamThread, String 
groupId, String clientId) throws ConfigException {

    final Map<String, Object> consumerProps = 
getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());




    // disable auto commit and throw exception if there is user overridden 
values,

    // this is necessary for streams commit semantics

    if (consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {

        throw new ConfigException("Unexpected user-specified consumer config " 
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG

                + ", as the streams client will always turn off auto 
committing.");

    }




    consumerProps.putAll(CONSUMER_DEFAULT_OVERRIDES);




    // bootstrap.servers should be from StreamsConfig

    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
this.originals().get(BOOTSTRAP_SERVERS_CONFIG));

    // add client id with stream client id prefix, and group id

    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

    consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + 
"-consumer");




    // add configs required for stream partition assignor

    consumerProps.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, 
streamThread);

    consumerProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 
getInt(REPLICATION_FACTOR_CONFIG));

    consumerProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 
getInt(NUM_STANDBY_REPLICAS_CONFIG));

    consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
StreamPartitionAssignor.class.getName());

    
consumerProps.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
 getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));

    if (!getString(ZOOKEEPER_CONNECT_CONFIG).equals("")) {

        consumerProps.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, 
getString(ZOOKEEPER_CONNECT_CONFIG));

    }




    consumerProps.put(APPLICATION_SERVER_CONFIG, 
getString(APPLICATION_SERVER_CONFIG));

    return consumerProps;

}





It will be use the CONSUMER_DEFAULT_OVERRIDES override the config of I set?

Reply via email to