[ 
https://issues.apache.org/jira/browse/KAFKA-3171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Serbu resolved KAFKA-3171.
---------------------------------
    Resolution: Invalid

You are right, after I changed ack with "acks" producer raise error when ISR 
are less than minimum configured.

{code}
1454106693 Send message: (0, {"id":0,"msg_text":"aaaaaa"})
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are 
rejected since there are fewer in-sync replicas than required.
        at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56)
        at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43)
        at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
        at kafka.examples.Producer.run(Producer.java:73)
Caused by: org.apache.kafka.common.errors.NotEnoughReplicasException: Messages 
are rejected since there are fewer in-sync replicas than required.
{code}

I haven't seen any error on stdout/stderr when I used a wrong parameter. I will 
try to configure log4j for my test program.

Sorry that I didn't verified this, the documentation also specify correct 
parameter "acks".

> min.insync.replicas not respected
> ---------------------------------
>
>                 Key: KAFKA-3171
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3171
>             Project: Kafka
>          Issue Type: Bug
>         Environment: Kafka downloaded from 
> http://apache.javapipe.com/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz
>            Reporter: Victor Serbu
>            Assignee: Grant Henke
>
> Hello.
> I wanted to test kafka durability properties but I didn't managed to block 
> producer if number of ISR is less than min.insync.replicas.
> For my test I used 3 VM. On each VM is running kafka and zookeeper.
> Then I created a topic with min ISR = 2.
> {code}
> [root@kafka1 kafka_2.11-0.9.0.0]# bin/kafka-topics.sh --create --zookeeper 
> kafka1.novalocal:2181,kafka2.novalocal:2181,kafka3.novalocal:2181 
> --replication-factor 3 --partitions 1 --config 'min.insync.replicas=3' 
> --topic test61
> Created topic "test61".
> [root@kafka1 kafka_2.11-0.9.0.0]# bin/kafka-topics.sh --describe --zookeeper 
> kafka1.novalocal:2181,kafka2.novalocal:2181,kafka3.novalocal:2181 --topic 
> test60                                                  Topic:test60    
> PartitionCount:1        ReplicationFactor:3     Configs:min.insync.replicas=2
>         Topic: test60   Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 
> 0,2,1
> {code}
> Then I stopped two of kafka brokers and started a producers and all messages 
> were commited (using a consumer I saw all the messages).
> {code}
> [root@kafka1 kafka_2.11-0.9.0.0]# bin/kafka-topics.sh --describe --zookeeper 
> kafka1.novalocal:2181,kafka2.novalocal:2181,kafka3.novalocal:2181 --topic 
> test60
> Topic:test60    PartitionCount:1        ReplicationFactor:3     
> Configs:min.insync.replicas=2
>         Topic: test60   Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 0
> {code}
> Producer
> {code}
> /**
>  * Licensed to the Apache Software Foundation (ASF) under one or more
>  * contributor license agreements.  See the NOTICE file distributed with
>  * this work for additional information regarding copyright ownership.
>  * The ASF licenses this file to You under the Apache License, Version 2.0
>  * (the "License"); you may not use this file except in compliance with
>  * the License.  You may obtain a copy of the License at
>  *
>  *    http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> package kafka.examples;
> import java.util.Properties;
> import java.util.Map;
> import java.util.Iterator;
> import java.util.concurrent.ExecutionException;
> import org.apache.kafka.clients.producer.Callback;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.clients.producer.RecordMetadata;
> import org.apache.kafka.common.Metric;
> import org.apache.kafka.common.MetricName;
> public class Producer extends Thread
> {
>   private final KafkaProducer<Integer, String> producer;
>   private final String topic;
>   private final Boolean isAsync;
>   private final Integer maxId;
>   private final Boolean debug;
>   public Producer(String topic, Boolean isAsync, Integer maxId, Boolean debug)
>   {
>     Properties props = new Properties();
>     props.put("bootstrap.servers", 
> "kafka1.novalocal:9092,kafka2.novalocal:9092,kafka3.novalocal:9092");
>     props.put("client.id", "DemoProducer");
>     props.put("retries",1);
>     props.put("key.serializer", 
> "org.apache.kafka.common.serialization.IntegerSerializer");
>     props.put("value.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
>     props.put("ack", "all");
>     producer = new KafkaProducer<Integer, String>(props);
>     this.topic = topic;
>     this.isAsync = isAsync;
>     this.maxId=maxId;
>     this.debug = debug;
>   }
>   public void run() {
>       int messageNo = 1;
>       String messageStr;
>       long startTime = System.currentTimeMillis();
>       for (messageNo = 0; messageNo < this.maxId; messageNo++) {
>               messageStr = "{\"id\":" + messageNo + 
> ",\"msg_text\":\"aaaaaa\"}";
>               if (isAsync) { // Send asynchronously
>                 producer.send(new ProducerRecord<Integer, String>(topic,
>                     messageNo,
>                     messageStr), new DemoCallBack(startTime, messageNo, 
> messageStr));
>               } else { // Send synchronously
>                 try {
>                   if (this.debug) {
>                     System.out.println("" + (int) (System.currentTimeMillis() 
> / 1000L) + " Send message: (" + messageNo + ", " + messageStr + ")");
>                   }
>                   producer.send(new ProducerRecord<Integer, String>(topic,
>                       messageNo,
>                       messageStr)).get();
>                   if (this.debug) {
>                     System.out.println("" + (int) (System.currentTimeMillis() 
> / 1000L) + " Sent message: (" + messageNo + ", " + messageStr + ")");
>                   }
>                 } catch (InterruptedException e) {
>                   e.printStackTrace();
>                 } catch (ExecutionException e) {
>                   e.printStackTrace();
>                 }
>               }
>       }
>       producer.close();
>     }
>   public static void main(String [] args) {
>     String topic = args[0];
>     Integer messageMaxId = Integer.parseInt(args[1]);
>     Boolean debug = Boolean.parseBoolean(args[2]);
>     Producer p1 = new Producer(topic, Boolean.FALSE, messageMaxId, debug);
>     p1.start();
>   }
> }
> class DemoCallBack implements Callback {
>   private long startTime;
>   private int key;
>   private String message;
>   public DemoCallBack(long startTime, int key, String message) {
>     this.startTime = startTime;
>     this.key = key;
>     this.message = message;
>   }
>   /**
>    * A callback method the user can implement to provide asynchronous 
> handling of request completion. This method will
>    * be called when the record sent to the server has been acknowledged. 
> Exactly one of the arguments will be
>    * non-null.
>    *
>    * @param metadata  The metadata for the record that was sent (i.e. the 
> partition and offset). Null if an error
>    *                  occurred.
>    * @param exception The exception thrown during processing of this record. 
> Null if no error occurred.
>    */
>   public void onCompletion(RecordMetadata metadata, Exception exception) {
>     long elapsedTime = System.currentTimeMillis() - startTime;
>     if (metadata != null) {
>       System.out.println(
>           "message(" + key + ", " + message + ") sent to partition(" + 
> metadata.partition() +
>               "), " +
>               "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
>     } else {
>       exception.printStackTrace();
>     }
>   }
> }
> {code}
> Kafka config
> {code}
> # Licensed to the Apache Software Foundation (ASF) under one or more
> # contributor license agreements.  See the NOTICE file distributed with
> # this work for additional information regarding copyright ownership.
> # The ASF licenses this file to You under the Apache License, Version 2.0
> # (the "License"); you may not use this file except in compliance with
> # the License.  You may obtain a copy of the License at
> #
> #    http://www.apache.org/licenses/LICENSE-2.0
> #
> # Unless required by applicable law or agreed to in writing, software
> # distributed under the License is distributed on an "AS IS" BASIS,
> # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> # See the License for the specific language governing permissions and
> # limitations under the License.
> # see kafka.server.KafkaConfig for additional details and defaults
> ############################# Server Basics #############################
> # The id of the broker. This must be set to a unique integer for each broker.
> broker.id=1
> ############################# Socket Server Settings 
> #############################
> listeners=PLAINTEXT://:9092
> # The port the socket server listens on
> #port=9092
> # Hostname the broker will bind to. If not set, the server will bind to all 
> interfaces
> #host.name=localhost
> # Hostname the broker will advertise to producers and consumers. If not set, 
> it uses the
> # value for "host.name" if configured.  Otherwise, it will use the value 
> returned from
> # java.net.InetAddress.getCanonicalHostName().
> #advertised.host.name=<hostname routable by clients>
> # The port to publish to ZooKeeper for clients to use. If this is not set,
> # it will publish the same port that the broker binds to.
> #advertised.port=<port accessible by clients>
> # The number of threads handling network requests
> num.network.threads=3
> # The number of threads doing disk I/O
> num.io.threads=8
> # The send buffer (SO_SNDBUF) used by the socket server
> socket.send.buffer.bytes=102400
> # The receive buffer (SO_RCVBUF) used by the socket server
> socket.receive.buffer.bytes=102400
> # The maximum size of a request that the socket server will accept 
> (protection against OOM)
> socket.request.max.bytes=104857600
> ############################# Log Basics #############################
> # A comma seperated list of directories under which to store log files
> log.dirs=/tmp/kafka-logs
> # The default number of log partitions per topic. More partitions allow 
> greater
> # parallelism for consumption, but this will also result in more files across
> # the brokers.
> num.partitions=1
> # The number of threads per data directory to be used for log recovery at 
> startup and flushing at shutdown.
> # This value is recommended to be increased for installations with data dirs 
> located in RAID array.
> num.recovery.threads.per.data.dir=1
> ############################# Log Flush Policy #############################
> # Messages are immediately written to the filesystem but by default we only 
> fsync() to sync
> # the OS cache lazily. The following configurations control the flush of data 
> to disk.
> # There are a few important trade-offs here:
> #    1. Durability: Unflushed data may be lost if you are not using 
> replication.
> #    2. Latency: Very large flush intervals may lead to latency spikes when 
> the flush does occur as there will be a lot of data to flush.
> #    3. Throughput: The flush is generally the most expensive operation, and 
> a small flush interval may lead to exceessive seeks.
> # The settings below allow one to configure the flush policy to flush data 
> after a period of time or
> # every N messages (or both). This can be done globally and overridden on a 
> per-topic basis.
> # The number of messages to accept before forcing a flush of data to disk
> #log.flush.interval.messages=10000
> # The maximum amount of time a message can sit in a log before we force a 
> flush
> #log.flush.interval.ms=1000
> ############################# Log Retention Policy 
> #############################
> # The following configurations control the disposal of log segments. The 
> policy can
> # be set to delete segments after a period of time, or after a given size has 
> accumulated.
> # A segment will be deleted whenever *either* of these criteria are met. 
> Deletion always happens
> # from the end of the log.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=168
> # A size-based retention policy for logs. Segments are pruned from the log as 
> long as the remaining
> # segments don't drop below log.retention.bytes.
> #log.retention.bytes=1073741824
> # The maximum size of a log segment file. When this size is reached a new log 
> segment will be created.
> log.segment.bytes=1073741824
> # The interval at which log segments are checked to see if they can be 
> deleted according
> # to the retention policies
> log.retention.check.interval.ms=300000
> # By default the log cleaner is disabled and the log retention policy will 
> default to just delete segments after their retention expires.
> # If log.cleaner.enable=true is set the cleaner will be enabled and 
> individual logs can then be marked for log compaction.
> log.cleaner.enable=false
> ############################# Zookeeper #############################
> # Zookeeper connection string (see zookeeper docs for details).
> # This is a comma separated host:port pairs, each corresponding to a zk
> # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
> # You can also append an optional chroot string to the urls to specify the
> # root directory for all kafka znodes.
> zookeeper.connect=kafka1.novalocal:2181,kafka2.novalocal:2181,kafka3.novalocal:2181
> # Timeout in ms for connecting to zookeeper
> zookeeper.connection.timeout.ms=6000
> unclean.leader.election.enable=false
> {code}
> Zookeeper config
> {code}
> tickTime=2000
> dataDir=/var/lib/zookeeper/data
> clientPort=2181
> initLimit=5
> syncLimit=2
> server.1=kafka1.novalocal:2888:3888
> server.2=kafka2.novalocal:2888:3888
> server.3=kafka3.novalocal:2888:3888
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to