Add another point:
We still use the asynchronous send method. Use parameter
alpha.feature.akka.channel.kafka.producer.batchSize trigger producer.flush(),
the balance of performance and reliability depends on the parameters set by the
user.
The most important thing is that Omega knows that Alpha has made a mistake.
List<BaseEvent> eventCache = new LinkedList();
@Override
public void publish(Object data) {
if(logger.isDebugEnabled()){
logger.debug("send message [{}] to [{}]", data, topic);
}
try {
if(data instanceof BaseEvent) {
BaseEvent event = (BaseEvent) data;
eventCache.add(event);
if(eventCache.size()== batchSize){
try {
List<Future> kafkaFutures = new LinkedList<>();
for(BaseEvent event : eventCache){
kafkaFutures.add(kafkaTemplate.send(topic,
event.getGlobalTxId(), Callback...);
}
producer.flush();
for (Future future : kafkaFutures) {
future.get();
}
eventCache.clear();
} catch (Exception ex) {
logger.warn("Sending events to Kafka failed", ex);
throw new Exception("Commit failed as send to Kafka
failed",ex);
}
}
}else{
throw new UnsupportedOperationException("data must be BaseEvent
type");
}
} catch (InterruptedException | ExecutionException |
UnsupportedOperationException e) {
logger.error("publish Exception = [{}]", e.getMessage(), e);
throw new RuntimeException(e);
}
}
> 在 2019年8月17日,下午3:29,Zhang Lei <[email protected]> 写道:
>
> Hi, Team
>
> Our previous discussion on PR[1] about using synchronous or asynchronous
> methods to send Kafka messages, I think need a trade-off in reliability and
> performance.
>
> Maybe we give the option to the user by allowing the user to customize some
> parameters, I have the following suggestions about the Kafka producer
> parameters:
>
> Key: Messages ordered and can't be lost, but they are allowed to repeat for
> FSM
>
> 1. Default parameter
>
> max.in.flight.requests.per.connection is 1 (User modification is prohibited
> for ordered)
> acks is -1
> retries is greater than 0
>
> 2. Allow users to define most parameters of the Kafka producer, E.g.
>
> acks
> retries
> buffer.memory
> compresstion.type
> min.insync.replicas > 1 (use with acks)
> replication.factor > min.insync.replicas
> timeout.ms
> request.timeout.ms
> metadata.fetch.timeout.ms
> max.block.ms
> max.request.size
>
> 3. KafkaProducer.send(record, callback) or KafkaProducer.send(record).get()
>
> KafkaProducer.send(record).get() can cause performance problems, but we can
> fix it by deploying multiple alphas
>
> KafkaProducer.send(record, callback) set max.block.ms=0 & large enough
> buffer.memory. But we still have to deal with the callback failure scenario.
> In asynchronous mode, if the message is sent, but the acknowledgment has not
> been received, the buffer pool is full, and the configuration file is set to
> not limit the timeout for the blocking timeout, which means that the
> production end is blocked all the time. Ensure that data is not lost.
>
> Maybe we can use the parameters to allow users to choose to use synchronous
> or asynchronous sending mode, and use asynchronous mode to get better
> performance when there is a reliable network and Kafka cluster.
>
> [1] https://github.com/apache/servicecomb-pack/pull/540
> <https://github.com/apache/servicecomb-pack/pull/540>
>
> Lei Zhang
>