Hello,
Working with kafka 0.10.1.0, I used these config
val props = new Properties
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, broker)
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass)
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Integer.getClass)
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
"latest")
but these code
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
"latest")
does not work, what is the reason?
I read the code of org.apache.kafka.streams.StreamsConfig, there has some code:
private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES;
static
{
Map<String, Object> tempConsumerDefaultOverrides = new HashMap<>();
tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
"1000");
tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"false");
CONSUMER_DEFAULT_OVERRIDES =
Collections.unmodifiableMap(tempConsumerDefaultOverrides);
}
public Map<String, Object> getConsumerConfigs(StreamThread streamThread, String
groupId, String clientId) throws ConfigException {
final Map<String, Object> consumerProps =
getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
// disable auto commit and throw exception if there is user overridden
values,
// this is necessary for streams commit semantics
if (consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
throw new ConfigException("Unexpected user-specified consumer config "
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
+ ", as the streams client will always turn off auto
committing.");
}
consumerProps.putAll(CONSUMER_DEFAULT_OVERRIDES);
// bootstrap.servers should be from StreamsConfig
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
// add client id with stream client id prefix, and group id
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId +
"-consumer");
// add configs required for stream partition assignor
consumerProps.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE,
streamThread);
consumerProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG,
getInt(REPLICATION_FACTOR_CONFIG));
consumerProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG,
getInt(NUM_STANDBY_REPLICAS_CONFIG));
consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
StreamPartitionAssignor.class.getName());
consumerProps.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
if (!getString(ZOOKEEPER_CONNECT_CONFIG).equals("")) {
consumerProps.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
getString(ZOOKEEPER_CONNECT_CONFIG));
}
consumerProps.put(APPLICATION_SERVER_CONFIG,
getString(APPLICATION_SERVER_CONFIG));
return consumerProps;
}
It will be use the CONSUMER_DEFAULT_OVERRIDES override the config of I set?