[
https://issues.apache.org/jira/browse/KAFKA-3171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Victor Serbu resolved KAFKA-3171.
---------------------------------
Resolution: Invalid
You are right, after I changed ack with "acks" producer raise error when ISR
are less than minimum configured.
{code}
1454106693 Send message: (0, {"id":0,"msg_text":"aaaaaa"})
java.util.concurrent.ExecutionException:
org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are
rejected since there are fewer in-sync replicas than required.
at
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56)
at
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43)
at
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
at kafka.examples.Producer.run(Producer.java:73)
Caused by: org.apache.kafka.common.errors.NotEnoughReplicasException: Messages
are rejected since there are fewer in-sync replicas than required.
{code}
I haven't seen any error on stdout/stderr when I used a wrong parameter. I will
try to configure log4j for my test program.
Sorry that I didn't verified this, the documentation also specify correct
parameter "acks".
> min.insync.replicas not respected
> ---------------------------------
>
> Key: KAFKA-3171
> URL: https://issues.apache.org/jira/browse/KAFKA-3171
> Project: Kafka
> Issue Type: Bug
> Environment: Kafka downloaded from
> http://apache.javapipe.com/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz
> Reporter: Victor Serbu
> Assignee: Grant Henke
>
> Hello.
> I wanted to test kafka durability properties but I didn't managed to block
> producer if number of ISR is less than min.insync.replicas.
> For my test I used 3 VM. On each VM is running kafka and zookeeper.
> Then I created a topic with min ISR = 2.
> {code}
> [root@kafka1 kafka_2.11-0.9.0.0]# bin/kafka-topics.sh --create --zookeeper
> kafka1.novalocal:2181,kafka2.novalocal:2181,kafka3.novalocal:2181
> --replication-factor 3 --partitions 1 --config 'min.insync.replicas=3'
> --topic test61
> Created topic "test61".
> [root@kafka1 kafka_2.11-0.9.0.0]# bin/kafka-topics.sh --describe --zookeeper
> kafka1.novalocal:2181,kafka2.novalocal:2181,kafka3.novalocal:2181 --topic
> test60 Topic:test60
> PartitionCount:1 ReplicationFactor:3 Configs:min.insync.replicas=2
> Topic: test60 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr:
> 0,2,1
> {code}
> Then I stopped two of kafka brokers and started a producers and all messages
> were commited (using a consumer I saw all the messages).
> {code}
> [root@kafka1 kafka_2.11-0.9.0.0]# bin/kafka-topics.sh --describe --zookeeper
> kafka1.novalocal:2181,kafka2.novalocal:2181,kafka3.novalocal:2181 --topic
> test60
> Topic:test60 PartitionCount:1 ReplicationFactor:3
> Configs:min.insync.replicas=2
> Topic: test60 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0
> {code}
> Producer
> {code}
> /**
> * Licensed to the Apache Software Foundation (ASF) under one or more
> * contributor license agreements. See the NOTICE file distributed with
> * this work for additional information regarding copyright ownership.
> * The ASF licenses this file to You under the Apache License, Version 2.0
> * (the "License"); you may not use this file except in compliance with
> * the License. You may obtain a copy of the License at
> *
> * http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing, software
> * distributed under the License is distributed on an "AS IS" BASIS,
> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> * See the License for the specific language governing permissions and
> * limitations under the License.
> */
> package kafka.examples;
> import java.util.Properties;
> import java.util.Map;
> import java.util.Iterator;
> import java.util.concurrent.ExecutionException;
> import org.apache.kafka.clients.producer.Callback;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.clients.producer.RecordMetadata;
> import org.apache.kafka.common.Metric;
> import org.apache.kafka.common.MetricName;
> public class Producer extends Thread
> {
> private final KafkaProducer<Integer, String> producer;
> private final String topic;
> private final Boolean isAsync;
> private final Integer maxId;
> private final Boolean debug;
> public Producer(String topic, Boolean isAsync, Integer maxId, Boolean debug)
> {
> Properties props = new Properties();
> props.put("bootstrap.servers",
> "kafka1.novalocal:9092,kafka2.novalocal:9092,kafka3.novalocal:9092");
> props.put("client.id", "DemoProducer");
> props.put("retries",1);
> props.put("key.serializer",
> "org.apache.kafka.common.serialization.IntegerSerializer");
> props.put("value.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
> props.put("ack", "all");
> producer = new KafkaProducer<Integer, String>(props);
> this.topic = topic;
> this.isAsync = isAsync;
> this.maxId=maxId;
> this.debug = debug;
> }
> public void run() {
> int messageNo = 1;
> String messageStr;
> long startTime = System.currentTimeMillis();
> for (messageNo = 0; messageNo < this.maxId; messageNo++) {
> messageStr = "{\"id\":" + messageNo +
> ",\"msg_text\":\"aaaaaa\"}";
> if (isAsync) { // Send asynchronously
> producer.send(new ProducerRecord<Integer, String>(topic,
> messageNo,
> messageStr), new DemoCallBack(startTime, messageNo,
> messageStr));
> } else { // Send synchronously
> try {
> if (this.debug) {
> System.out.println("" + (int) (System.currentTimeMillis()
> / 1000L) + " Send message: (" + messageNo + ", " + messageStr + ")");
> }
> producer.send(new ProducerRecord<Integer, String>(topic,
> messageNo,
> messageStr)).get();
> if (this.debug) {
> System.out.println("" + (int) (System.currentTimeMillis()
> / 1000L) + " Sent message: (" + messageNo + ", " + messageStr + ")");
> }
> } catch (InterruptedException e) {
> e.printStackTrace();
> } catch (ExecutionException e) {
> e.printStackTrace();
> }
> }
> }
> producer.close();
> }
> public static void main(String [] args) {
> String topic = args[0];
> Integer messageMaxId = Integer.parseInt(args[1]);
> Boolean debug = Boolean.parseBoolean(args[2]);
> Producer p1 = new Producer(topic, Boolean.FALSE, messageMaxId, debug);
> p1.start();
> }
> }
> class DemoCallBack implements Callback {
> private long startTime;
> private int key;
> private String message;
> public DemoCallBack(long startTime, int key, String message) {
> this.startTime = startTime;
> this.key = key;
> this.message = message;
> }
> /**
> * A callback method the user can implement to provide asynchronous
> handling of request completion. This method will
> * be called when the record sent to the server has been acknowledged.
> Exactly one of the arguments will be
> * non-null.
> *
> * @param metadata The metadata for the record that was sent (i.e. the
> partition and offset). Null if an error
> * occurred.
> * @param exception The exception thrown during processing of this record.
> Null if no error occurred.
> */
> public void onCompletion(RecordMetadata metadata, Exception exception) {
> long elapsedTime = System.currentTimeMillis() - startTime;
> if (metadata != null) {
> System.out.println(
> "message(" + key + ", " + message + ") sent to partition(" +
> metadata.partition() +
> "), " +
> "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
> } else {
> exception.printStackTrace();
> }
> }
> }
> {code}
> Kafka config
> {code}
> # Licensed to the Apache Software Foundation (ASF) under one or more
> # contributor license agreements. See the NOTICE file distributed with
> # this work for additional information regarding copyright ownership.
> # The ASF licenses this file to You under the Apache License, Version 2.0
> # (the "License"); you may not use this file except in compliance with
> # the License. You may obtain a copy of the License at
> #
> # http://www.apache.org/licenses/LICENSE-2.0
> #
> # Unless required by applicable law or agreed to in writing, software
> # distributed under the License is distributed on an "AS IS" BASIS,
> # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> # See the License for the specific language governing permissions and
> # limitations under the License.
> # see kafka.server.KafkaConfig for additional details and defaults
> ############################# Server Basics #############################
> # The id of the broker. This must be set to a unique integer for each broker.
> broker.id=1
> ############################# Socket Server Settings
> #############################
> listeners=PLAINTEXT://:9092
> # The port the socket server listens on
> #port=9092
> # Hostname the broker will bind to. If not set, the server will bind to all
> interfaces
> #host.name=localhost
> # Hostname the broker will advertise to producers and consumers. If not set,
> it uses the
> # value for "host.name" if configured. Otherwise, it will use the value
> returned from
> # java.net.InetAddress.getCanonicalHostName().
> #advertised.host.name=<hostname routable by clients>
> # The port to publish to ZooKeeper for clients to use. If this is not set,
> # it will publish the same port that the broker binds to.
> #advertised.port=<port accessible by clients>
> # The number of threads handling network requests
> num.network.threads=3
> # The number of threads doing disk I/O
> num.io.threads=8
> # The send buffer (SO_SNDBUF) used by the socket server
> socket.send.buffer.bytes=102400
> # The receive buffer (SO_RCVBUF) used by the socket server
> socket.receive.buffer.bytes=102400
> # The maximum size of a request that the socket server will accept
> (protection against OOM)
> socket.request.max.bytes=104857600
> ############################# Log Basics #############################
> # A comma seperated list of directories under which to store log files
> log.dirs=/tmp/kafka-logs
> # The default number of log partitions per topic. More partitions allow
> greater
> # parallelism for consumption, but this will also result in more files across
> # the brokers.
> num.partitions=1
> # The number of threads per data directory to be used for log recovery at
> startup and flushing at shutdown.
> # This value is recommended to be increased for installations with data dirs
> located in RAID array.
> num.recovery.threads.per.data.dir=1
> ############################# Log Flush Policy #############################
> # Messages are immediately written to the filesystem but by default we only
> fsync() to sync
> # the OS cache lazily. The following configurations control the flush of data
> to disk.
> # There are a few important trade-offs here:
> # 1. Durability: Unflushed data may be lost if you are not using
> replication.
> # 2. Latency: Very large flush intervals may lead to latency spikes when
> the flush does occur as there will be a lot of data to flush.
> # 3. Throughput: The flush is generally the most expensive operation, and
> a small flush interval may lead to exceessive seeks.
> # The settings below allow one to configure the flush policy to flush data
> after a period of time or
> # every N messages (or both). This can be done globally and overridden on a
> per-topic basis.
> # The number of messages to accept before forcing a flush of data to disk
> #log.flush.interval.messages=10000
> # The maximum amount of time a message can sit in a log before we force a
> flush
> #log.flush.interval.ms=1000
> ############################# Log Retention Policy
> #############################
> # The following configurations control the disposal of log segments. The
> policy can
> # be set to delete segments after a period of time, or after a given size has
> accumulated.
> # A segment will be deleted whenever *either* of these criteria are met.
> Deletion always happens
> # from the end of the log.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=168
> # A size-based retention policy for logs. Segments are pruned from the log as
> long as the remaining
> # segments don't drop below log.retention.bytes.
> #log.retention.bytes=1073741824
> # The maximum size of a log segment file. When this size is reached a new log
> segment will be created.
> log.segment.bytes=1073741824
> # The interval at which log segments are checked to see if they can be
> deleted according
> # to the retention policies
> log.retention.check.interval.ms=300000
> # By default the log cleaner is disabled and the log retention policy will
> default to just delete segments after their retention expires.
> # If log.cleaner.enable=true is set the cleaner will be enabled and
> individual logs can then be marked for log compaction.
> log.cleaner.enable=false
> ############################# Zookeeper #############################
> # Zookeeper connection string (see zookeeper docs for details).
> # This is a comma separated host:port pairs, each corresponding to a zk
> # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
> # You can also append an optional chroot string to the urls to specify the
> # root directory for all kafka znodes.
> zookeeper.connect=kafka1.novalocal:2181,kafka2.novalocal:2181,kafka3.novalocal:2181
> # Timeout in ms for connecting to zookeeper
> zookeeper.connection.timeout.ms=6000
> unclean.leader.election.enable=false
> {code}
> Zookeeper config
> {code}
> tickTime=2000
> dataDir=/var/lib/zookeeper/data
> clientPort=2181
> initLimit=5
> syncLimit=2
> server.1=kafka1.novalocal:2888:3888
> server.2=kafka2.novalocal:2888:3888
> server.3=kafka3.novalocal:2888:3888
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)