[
https://issues.apache.org/jira/browse/KAFKA-3112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15103456#comment-15103456
]
Jonathan Bond commented on KAFKA-3112:
--------------------------------------
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
new file mode 100644
index 0000000..a32ff87
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.kafka.clients.producer.BufferExhaustedException;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.SystemTime;
+
+public class BufferPoolTest {
+
+ public static void main(String[] args) throws InterruptedException {
+
+ final BufferPool bp = new BufferPool(16 * 1024 * 1024, 16384, false,
new Metrics(), new SystemTime(),
+ "metricGroupName", Collections.emptyMap());
+ final AtomicLong allocated = new AtomicLong(0);
+ final AtomicLong bufferFull = new AtomicLong(0);
+ final long numberOfMessages = 1000000000;
+ final AtomicLong messages = new AtomicLong(numberOfMessages);
+ final int threadCount = 16;
+ final CountDownLatch latch = new CountDownLatch(threadCount + 1);
+ final Object lock = new Object();
+ final ReentrantLock rl = new ReentrantLock();
+ final LinkedBlockingQueue<ByteBuffer> queue = new
LinkedBlockingQueue<ByteBuffer>();
+ final AtomicLong latency = new AtomicLong();
+
+ Thread reaperThread = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ List<ByteBuffer> currenlyAllocated = new
ArrayList<ByteBuffer>(2048);
+ while (true) {
+
+ try {
+ Thread.sleep(100);
+ queue.drainTo(currenlyAllocated, 2048);
+
+ for (ByteBuffer byteBuffer : currenlyAllocated) {
+ bp.deallocate(byteBuffer);
+ }
+ // System.out.println("drained
"+currenlyAllocated.size());
+ currenlyAllocated.clear();
+ } catch (Exception x) {
+ System.out.println(x);
+ }
+ }
+
+ }
+ });
+ reaperThread.setDaemon(true);
+ reaperThread.start();
+ for (int i = 0; i < threadCount; i++) {
+ Thread th = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ latch.countDown();
+ try {
+
+ latch.await();
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ while (messages.getAndDecrement() > 0) {
+ try {
+ // synchronized (lock) {
+ //
+ // }
+
+ // rl.lock();
+ //
+ // rl.unlock();
+ long start = System.nanoTime();
+ try {
+
+ //ByteBuffer bb = bp.allocate(1000);
+ // Thread.yield();
+ ByteBuffer bb = allocate(bp, 1000);
+
+ if (bb != null) {
+ queue.offer(bb);
+ allocated.incrementAndGet();
+ } else {
+ bufferFull.incrementAndGet();
+ }
+ } catch (BufferExhaustedException bee) {
+ bufferFull.incrementAndGet();
+ }
+
+ long end = System.nanoTime();
+ latency.addAndGet((end - start));
+
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+
+ }
+ }
+ });
+ th.setPriority(Thread.MIN_PRIORITY);
+ th.setDaemon(true);
+ th.start();
+
+ }
+ latch.countDown();
+
+ long time = System.currentTimeMillis();
+ long start = time;
+ long previousF = bufferFull.get();
+ long previousA = allocated.get();
+ long nextSleep = 1000;
+
+ while (true) {
+
+ Thread.sleep(nextSleep);
+ long now = System.currentTimeMillis();
+ long fullV = bufferFull.get();
+ long allocateV = allocated.get();
+ long newVal = fullV + allocateV;
+
+ long latencyMs =latency.get();
+
+ System.out.println(now - start + "," + (double) (newVal -
(previousF + previousA)) / (double) (now - time) * 1000
+ + "," + (double) (allocateV) / (double) (newVal) + "," +
fullV + "," + allocateV + ","
+ + theGovernator.availablePermits() + "," +
(fullV+allocateV) + "," + latencyMs
+ / (numberOfMessages - messages.get()));
+
+ time = now;
+ previousF = fullV;
+ previousA = allocateV;
+
+// if (now - start > 120000)
+// break;
+
+ long diff = System.currentTimeMillis() - now;
+ nextSleep = 1000 - diff;
+ }
+
+ }
+
+ private static final Semaphore theGovernator = new Semaphore(256 * 1000);
+
+ private static ByteBuffer allocate(BufferPool bp, int size) throws
InterruptedException {
+ ByteBuffer result;
+
+ final Boolean youMayPass = theGovernator.tryAcquire(size);
+ try {
+ if (youMayPass) {
+ result = bp.allocate(size);
+ } else {
+ result = null;
+ }
+ } finally {
+ if (youMayPass)
+ theGovernator.release(size);
+ }
+
+ return result;
+ }
+
+}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/common/metrics/stats/AvgTest.java
b/clients/src/main/java/org/apache/kafka/common/metrics/stats/AvgTest.java
new file mode 100644
index 0000000..ea8efd2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/AvgTest.java
@@ -0,0 +1,26 @@
+package org.apache.kafka.common.metrics.stats;
+
+import org.apache.kafka.common.metrics.MetricConfig;
+
+public class AvgTest {
+
+ public static void main(String []args) throws Exception
+ {
+ Avg a = new Avg();
+ MetricConfig cfg = new MetricConfig();
+ for(int i=0;i<15;i++)
+ {
+ a.record(cfg, i, System.currentTimeMillis());
+ System.out.println(i+" "+a.measure(cfg,
System.currentTimeMillis()));
+ Thread.sleep(1000);
+ }
+
+ for(int i=0;i<150;i++)
+ {
+
+ System.out.println(Double.isFinite(a.measure(cfg,
System.currentTimeMillis())));
+ Thread.sleep(1000);
+ }
+ }
+
+}
diff --git
a/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java
b/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java
index d5af354..05143ac 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java
@@ -39,16 +39,18 @@
throw new ConfigException("Invalid url in " +
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
try {
InetSocketAddress address = new InetSocketAddress(host,
port);
- if (address.isUnresolved())
- throw new ConfigException("DNS resolution failed for
url in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
- addresses.add(address);
+ if (address.isUnresolved()) {
+ log.warn("DNS resolution failed for url in " +
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
+ } else {
+ addresses.add(address);
+ }
} catch (NumberFormatException e) {
throw new ConfigException("Invalid port in " +
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
}
}
}
if (addresses.size() < 1)
- throw new ConfigException("No bootstrap urls given in " +
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
+ throw new ConfigException("No resolvable bootstrap urls given in "
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
return addresses;
}
> One incorrect bootstrap server will prevent Kafka producer from opening
> -----------------------------------------------------------------------
>
> Key: KAFKA-3112
> URL: https://issues.apache.org/jira/browse/KAFKA-3112
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Affects Versions: 0.8.2.1
> Reporter: Jonathan Bond
> Priority: Critical
>
> If any of the servers specified in bootstrap.servers are not resolvable
> through DNS the configuration is taken as an error, and the client won't
> start up. We pass in 30 possible servers, and one had an issue so the client
> wouldn't start.
> It would be better if the client will attempt to start if there is at least
> one server available from DNS.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)