Repository: kafka Updated Branches: refs/heads/trunk c808e8955 -> 43fb2df7a
MINOR: Map `mkString` format updated to default java format This is a minor change but it helps to improve the log readability. Author: Kamal C <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #2709 from Kamal15/util Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/43fb2df7 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/43fb2df7 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/43fb2df7 Branch: refs/heads/trunk Commit: 43fb2df7a4b2bc7637dcba9436a5435cdcb4fb27 Parents: c808e89 Author: Kamal C <[email protected]> Authored: Thu Mar 30 13:17:09 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Thu Mar 30 13:23:52 2017 +0100 ---------------------------------------------------------------------- .../kafka/clients/CommonClientConfigs.java | 2 +- .../common/requests/CreateTopicsRequest.java | 7 ++- .../common/requests/DeleteRecordsRequest.java | 3 +- .../kafka/common/requests/FetchRequest.java | 3 +- .../common/requests/LeaderAndIsrRequest.java | 2 +- .../common/requests/ListOffsetRequest.java | 5 +-- .../common/requests/OffsetCommitRequest.java | 3 +- .../kafka/common/requests/ProduceRequest.java | 2 +- .../common/requests/UpdateMetadataRequest.java | 4 +- .../org/apache/kafka/common/utils/Utils.java | 47 +++++--------------- .../apache/kafka/common/utils/UtilsTest.java | 2 +- .../internals/StreamPartitionAssignor.java | 2 +- .../apache/kafka/streams/StreamsConfigTest.java | 4 +- .../kafka/tools/ClientCompatibilityTest.java | 3 +- 14 files changed, 30 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/43fb2df7/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java index 5006ee2..b2c8937 100644 --- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java +++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java @@ -73,7 +73,7 @@ public class CommonClientConfigs { public static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the <code>MetricReporter</code> interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics."; public static final String SECURITY_PROTOCOL_CONFIG = "security.protocol"; - public static final String SECURITY_PROTOCOL_DOC = "Protocol used to communicate with brokers. Valid values are: " + Utils.mkString(nonTestingSecurityProtocolNames(), ", ") + "."; + public static final String SECURITY_PROTOCOL_DOC = "Protocol used to communicate with brokers. Valid values are: " + Utils.join(nonTestingSecurityProtocolNames(), ", ") + "."; public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT"; public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms"; http://git-wip-us.apache.org/repos/asf/kafka/blob/43fb2df7/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java index 673810d..072dde8 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java @@ -20,7 +20,6 @@ import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.utils.Utils; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -88,8 +87,8 @@ public class CreateTopicsRequest extends AbstractRequest { StringBuilder bld = new StringBuilder(); bld.append("(numPartitions=").append(numPartitions). append(", replicationFactor=").append(replicationFactor). - append(", replicasAssignments=").append(Utils.mkString(replicasAssignments)). - append(", configs=").append(Utils.mkString(configs)). + append(", replicasAssignments=").append(replicasAssignments). + append(", configs=").append(configs). append(")"); return bld.toString(); } @@ -123,7 +122,7 @@ public class CreateTopicsRequest extends AbstractRequest { public String toString() { StringBuilder bld = new StringBuilder(); bld.append("(type=CreateTopicsRequest"). - append(", topics=").append(Utils.mkString(topics)). + append(", topics=").append(topics). append(", timeout=").append(timeout). append(", validateOnly=").append(validateOnly). append(")"); http://git-wip-us.apache.org/repos/asf/kafka/blob/43fb2df7/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java index f204c44..96f064c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java @@ -22,7 +22,6 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.utils.CollectionUtils; -import org.apache.kafka.common.utils.Utils; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; @@ -68,7 +67,7 @@ public class DeleteRecordsRequest extends AbstractRequest { StringBuilder builder = new StringBuilder(); builder.append("(type=DeleteRecordsRequest") .append(", timeout=").append(timeout) - .append(", partitionOffsets=(").append(Utils.mkString(partitionOffsets)) + .append(", partitionOffsets=(").append(partitionOffsets) .append("))"); return builder.toString(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/43fb2df7/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java index 6549f50..8cd2818 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.common.utils.Utils; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -151,7 +150,7 @@ public class FetchRequest extends AbstractRequest { append(", maxWait=").append(maxWait). append(", minBytes=").append(minBytes). append(", maxBytes=").append(maxBytes). - append(", fetchData=").append(Utils.mkString(fetchData)). + append(", fetchData=").append(fetchData). append(")"); return bld.toString(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/43fb2df7/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java index f51cfa9..8843755 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java @@ -77,7 +77,7 @@ public class LeaderAndIsrRequest extends AbstractRequest { bld.append("(type=LeaderAndIsRequest") .append(", controllerId=").append(controllerId) .append(", controllerEpoch=").append(controllerEpoch) - .append(", partitionStates=").append(Utils.mkString(partitionStates)) + .append(", partitionStates=").append(partitionStates) .append(", liveLeaders=(").append(Utils.join(liveLeaders, ", ")).append(")") .append(")"); return bld.toString(); http://git-wip-us.apache.org/repos/asf/kafka/blob/43fb2df7/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java index 1d62a96..3327071 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java @@ -22,7 +22,6 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.utils.CollectionUtils; -import org.apache.kafka.common.utils.Utils; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -128,10 +127,10 @@ public class ListOffsetRequest extends AbstractRequest { bld.append("(type=ListOffsetRequest") .append(", replicaId=").append(replicaId); if (offsetData != null) { - bld.append(", offsetData=").append(Utils.mkString(offsetData)); + bld.append(", offsetData=").append(offsetData); } if (partitionTimestamps != null) { - bld.append(", partitionTimestamps=").append(Utils.mkString(partitionTimestamps)); + bld.append(", partitionTimestamps=").append(partitionTimestamps); } bld.append(", minVersion=").append(minVersion); bld.append(")"); http://git-wip-us.apache.org/repos/asf/kafka/blob/43fb2df7/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java index 6459201..45975d0 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.utils.CollectionUtils; -import org.apache.kafka.common.utils.Utils; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -149,7 +148,7 @@ public class OffsetCommitRequest extends AbstractRequest { append(", memberId=").append(memberId). append(", generationId=").append(generationId). append(", retentionTime=").append(retentionTime). - append(", offsetData=").append(Utils.mkString(offsetData)). + append(", offsetData=").append(offsetData). append(")"); return bld.toString(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/43fb2df7/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index 0010ad6..7631391 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -82,7 +82,7 @@ public class ProduceRequest extends AbstractRequest { .append(", magic=").append(magic) .append(", acks=").append(acks) .append(", timeout=").append(timeout) - .append(", partitionRecords=(").append(Utils.mkString(partitionRecords)) + .append(", partitionRecords=(").append(partitionRecords) .append("))"); return bld.toString(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/43fb2df7/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java index 41b0c84..fc7a33f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java @@ -67,8 +67,8 @@ public class UpdateMetadataRequest extends AbstractRequest { bld.append("(type: UpdateMetadataRequest="). append(", controllerId=").append(controllerId). append(", controllerEpoch=").append(controllerEpoch). - append(", partitionStates=").append(Utils.mkString(partitionStates)). - append(", liveBrokers=").append(Utils.join(liveBrokers, " ,")). + append(", partitionStates=").append(partitionStates). + append(", liveBrokers=").append(Utils.join(liveBrokers, ", ")). append(")"); return bld.toString(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/43fb2df7/clients/src/main/java/org/apache/kafka/common/utils/Utils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 696d145..796b019 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -352,43 +352,39 @@ public class Utils { /** * Create a string representation of an array joined by the given separator * @param strs The array of items - * @param seperator The separator + * @param separator The separator * @return The string representation. */ - public static <T> String join(T[] strs, String seperator) { - return join(Arrays.asList(strs), seperator); + public static <T> String join(T[] strs, String separator) { + return join(Arrays.asList(strs), separator); } /** * Create a string representation of a list joined by the given separator * @param list The list of items - * @param seperator The separator + * @param separator The separator * @return The string representation. */ - public static <T> String join(Collection<T> list, String seperator) { + public static <T> String join(Collection<T> list, String separator) { StringBuilder sb = new StringBuilder(); Iterator<T> iter = list.iterator(); while (iter.hasNext()) { sb.append(iter.next()); if (iter.hasNext()) - sb.append(seperator); + sb.append(separator); } return sb.toString(); } - public static <K, V> String mkString(Map<K, V> map) { - return mkString(map, "{", "}", "=", " ,"); - } - public static <K, V> String mkString(Map<K, V> map, String begin, String end, - String keyValueSeparator, String elementSeperator) { + String keyValueSeparator, String elementSeparator) { StringBuilder bld = new StringBuilder(); bld.append(begin); String prefix = ""; for (Map.Entry<K, V> entry : map.entrySet()) { bld.append(prefix).append(entry.getKey()). append(keyValueSeparator).append(entry.getValue()); - prefix = elementSeperator; + prefix = elementSeparator; } bld.append(end); return bld.toString(); @@ -439,7 +435,7 @@ public class Utils { thread.setDaemon(daemon); thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread t, Throwable e) { - log.error("Uncaught exception in thread '" + t.getName() + "':", e); + log.error("Uncaught exception in thread '{}':", t.getName(), e); } }); return thread; @@ -544,25 +540,6 @@ public class Utils { return Arrays.asList(elems); } - /* - * Create a string from a collection - * @param coll the collection - * @param separator the separator - */ - public static <T> CharSequence mkString(Collection<T> coll, String separator) { - StringBuilder sb = new StringBuilder(); - Iterator<T> iter = coll.iterator(); - if (iter.hasNext()) { - sb.append(iter.next().toString()); - - while (iter.hasNext()) { - sb.append(separator); - sb.append(iter.next().toString()); - } - } - return sb; - } - /** * Recursively delete the given file/directory and any subfiles (if any exist) * @@ -624,8 +601,8 @@ public class Utils { } catch (IOException outer) { try { Files.move(source, target, StandardCopyOption.REPLACE_EXISTING); - log.debug("Non-atomic move of " + source + " to " + target + " succeeded after atomic move failed due to " - + outer.getMessage()); + log.debug("Non-atomic move of {} to {} succeeded after atomic move failed due to {}", source, target, + outer.getMessage()); } catch (IOException inner) { inner.addSuppressed(outer); throw inner; @@ -663,7 +640,7 @@ public class Utils { try { closeable.close(); } catch (Throwable t) { - log.warn("Failed to close " + name, t); + log.warn("Failed to close {}", name, t); } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/43fb2df7/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java index 16742d5..512c29c 100755 --- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java @@ -81,7 +81,7 @@ public class UtilsTest { assertEquals("1", Utils.join(Arrays.asList("1"), ",")); assertEquals("1,2,3", Utils.join(Arrays.asList(1, 2, 3), ",")); } - + @Test public void testAbs() { assertEquals(0, Utils.abs(Integer.MIN_VALUE)); http://git-wip-us.apache.org/repos/asf/kafka/blob/43fb2df7/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index 859d661..24e6709 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -736,7 +736,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable } else if (numPartitions != partitions) { final String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]); Arrays.sort(topics); - throw new TopologyBuilderException(String.format("stream-thread [%s] Topics not co-partitioned: [%s]", threadName, Utils.mkString(Arrays.asList(topics), ","))); + throw new TopologyBuilderException(String.format("stream-thread [%s] Topics not co-partitioned: [%s]", threadName, Utils.join(Arrays.asList(topics), ","))); } } else if (allRepartitionTopicsNumPartitions.get(topic).numPartitions == NOT_AVAILABLE) { numPartitions = NOT_AVAILABLE; http://git-wip-us.apache.org/repos/asf/kafka/blob/43fb2df7/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index d345cbd..612d7a2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -83,7 +83,7 @@ public class StreamsConfigTest { @Test public void defaultSerdeShouldBeConfigured() { - Map<String, Object> serializerConfigs = new HashMap<String, Object>(); + Map<String, Object> serializerConfigs = new HashMap<>(); serializerConfigs.put("key.serializer.encoding", "UTF8"); serializerConfigs.put("value.serializer.encoding", "UTF-16"); Serializer<String> serializer = Serdes.String().serializer(); @@ -103,7 +103,7 @@ public class StreamsConfigTest { @Test public void shouldSupportMultipleBootstrapServers() { List<String> expectedBootstrapServers = Arrays.asList("broker1:9092", "broker2:9092"); - String bootstrapServersString = Utils.mkString(expectedBootstrapServers, ",").toString(); + String bootstrapServersString = Utils.join(expectedBootstrapServers, ","); Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "irrelevant"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersString); http://git-wip-us.apache.org/repos/asf/kafka/blob/43fb2df7/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java index 8274451..2a7d3e6 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java +++ b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java @@ -39,7 +39,6 @@ import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -208,7 +207,7 @@ public class ClientCompatibilityTest { @Override public String toString() { - return Utils.mkString(result); + return result.toString(); } }
