MINOR: syntax brush for java / bash / json / text

Author: Guozhang Wang <[email protected]>

Reviewers: Derrick Or <[email protected]>, Ismael Juma <[email protected]>

Closes #3214 from guozhangwang/KMinor-doc-java-brush


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/57b0d0fe
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/57b0d0fe
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/57b0d0fe

Branch: refs/heads/trunk
Commit: 57b0d0fe572399048e523d54beafa3520a708cf5
Parents: f8d6dba
Author: Guozhang Wang <[email protected]>
Authored: Wed Jun 7 15:17:25 2017 +0100
Committer: Ismael Juma <[email protected]>
Committed: Wed Jun 7 15:17:25 2017 +0100

----------------------------------------------------------------------
 docs/api.html            |   8 +-
 docs/configuration.html  |  19 ++---
 docs/connect.html        |  36 ++++-----
 docs/design.html         |   6 +-
 docs/implementation.html |  34 ++++----
 docs/ops.html            | 107 ++++++++++++-------------
 docs/quickstart.html     | 176 +++++++++++++++++++++---------------------
 docs/security.html       | 110 +++++++++++++-------------
 docs/streams.html        |  32 ++++----
 9 files changed, 268 insertions(+), 260 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/57b0d0fe/docs/api.html
----------------------------------------------------------------------
diff --git a/docs/api.html b/docs/api.html
index 5f47c3a..7966b90 100644
--- a/docs/api.html
+++ b/docs/api.html
@@ -35,7 +35,7 @@
        <p>
        To use the producer, you can use the following maven dependency:
 
-       <pre>
+       <pre class="brush: xml;">
                &lt;dependency&gt;
                        &lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
                        &lt;artifactId&gt;kafka-clients&lt;/artifactId&gt;
@@ -51,7 +51,7 @@
        <a 
href="/{{version}}/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html"
 title="Kafka {{dotVersion}} Javadoc">javadocs</a>.
        <p>
        To use the consumer, you can use the following maven dependency:
-       <pre>
+       <pre class="brush: xml;">
                &lt;dependency&gt;
                        &lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
                        &lt;artifactId&gt;kafka-clients&lt;/artifactId&gt;
@@ -70,7 +70,7 @@
        <p>
        To use Kafka Streams you can use the following maven dependency:
 
-       <pre>
+       <pre class="brush: xml;">
                &lt;dependency&gt;
                        &lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
                        &lt;artifactId&gt;kafka-streams&lt;/artifactId&gt;
@@ -92,7 +92,7 @@
        The AdminClient API supports managing and inspecting topics, brokers, 
acls, and other Kafka objects.
        <p>
        To use the AdminClient API, add the following Maven dependency:
-       <pre>
+       <pre class="brush: xml;">
                &lt;dependency&gt;
                        &lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
                        &lt;artifactId&gt;kafka-clients&lt;/artifactId&gt;

http://git-wip-us.apache.org/repos/asf/kafka/blob/57b0d0fe/docs/configuration.html
----------------------------------------------------------------------
diff --git a/docs/configuration.html b/docs/configuration.html
index 2cad283..ea4a5bb 100644
--- a/docs/configuration.html
+++ b/docs/configuration.html
@@ -36,23 +36,24 @@
   <a id="topic-config" href="#topic-config">Topic-level configuration</a>
 
   Configurations pertinent to topics have both a server default as well an 
optional per-topic override. If no per-topic configuration is given the server 
default is used. The override can be set at topic creation time by giving one 
or more <code>--config</code> options. This example creates a topic named 
<i>my-topic</i> with a custom max message size and flush rate:
-  <pre>
-  <b> &gt; bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic 
my-topic --partitions 1
-          --replication-factor 1 --config max.message.bytes=64000 --config 
flush.messages=1</b>
+  <pre class="brush: bash;">
+  &gt; bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic 
my-topic --partitions 1
+      --replication-factor 1 --config max.message.bytes=64000 --config 
flush.messages=1
   </pre>
   Overrides can also be changed or set later using the alter configs command. 
This example updates the max message size for <i>my-topic</i>:
-  <pre>
-  <b> &gt; bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type 
topics --entity-name my-topic --alter --add-config max.message.bytes=128000</b>
+  <pre class="brush: bash;">
+  &gt; bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics 
--entity-name my-topic
+      --alter --add-config max.message.bytes=128000
   </pre>
 
   To check overrides set on the topic you can do
-  <pre>
-  <b> &gt; bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type 
topics --entity-name my-topic --describe</b>
+  <pre class="brush: bash;">
+  &gt; bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics 
--entity-name my-topic --describe
   </pre>
 
   To remove an override you can do
-  <pre>
-  <b> &gt; bin/kafka-configs.sh --zookeeper localhost:2181  --entity-type 
topics --entity-name my-topic --alter --delete-config max.message.bytes</b>
+  <pre class="brush: bash;">
+  &gt; bin/kafka-configs.sh --zookeeper localhost:2181  --entity-type topics 
--entity-name my-topic --alter --delete-config max.message.bytes
   </pre>
 
   The following are the topic-level configurations. The server's default 
configuration for this property is given under the Server Default Property 
heading. A given server default config value only applies to a topic if it does 
not have an explicit topic config override.

http://git-wip-us.apache.org/repos/asf/kafka/blob/57b0d0fe/docs/connect.html
----------------------------------------------------------------------
diff --git a/docs/connect.html b/docs/connect.html
index 48c5139..57505d6 100644
--- a/docs/connect.html
+++ b/docs/connect.html
@@ -40,7 +40,7 @@
 
     In standalone mode all work is performed in a single process. This 
configuration is simpler to setup and get started with and may be useful in 
situations where only one worker makes sense (e.g. collecting log files), but 
it does not benefit from some of the features of Kafka Connect such as fault 
tolerance. You can start a standalone process with the following command:
 
-    <pre>
+    <pre class="brush: bash;">
     &gt; bin/connect-standalone.sh config/connect-standalone.properties 
connector1.properties [connector2.properties ...]
     </pre>
 
@@ -62,7 +62,7 @@
 
     Distributed mode handles automatic balancing of work, allows you to scale 
up (or down) dynamically, and offers fault tolerance both in the active tasks 
and for configuration and offset commit data. Execution is very similar to 
standalone mode:
 
-    <pre>
+    <pre class="brush: bash;">
     &gt; bin/connect-distributed.sh config/connect-distributed.properties
     </pre>
 
@@ -118,7 +118,7 @@
 
     <p>Throughout the example we'll use schemaless JSON data format. To use 
schemaless format, we changed the following two lines in 
<code>connect-standalone.properties</code> from true to false:</p>
 
-    <pre>
+    <pre class="brush: text;">
         key.converter.schemas.enable
         value.converter.schemas.enable
     </pre>
@@ -131,7 +131,7 @@
 
     After adding the transformations, 
<code>connect-file-source.properties</code> file looks as following:
 
-    <pre>
+    <pre class="brush: text;">
         name=local-file-source
         connector.class=FileStreamSource
         tasks.max=1
@@ -149,7 +149,7 @@
 
     When we ran the file source connector on my sample file without the 
transformations, and then read them using 
<code>kafka-console-consumer.sh</code>, the results were:
 
-    <pre>
+    <pre class="brush: text;">
         "foo"
         "bar"
         "hello world"
@@ -157,7 +157,7 @@
 
     We then create a new file connector, this time after adding the 
transformations to the configuration file. This time, the results will be:
 
-    <pre>
+    <pre class="brush: json;">
         {"line":"foo","data_source":"test-file-source"}
         {"line":"bar","data_source":"test-file-source"}
         {"line":"hello world","data_source":"test-file-source"}
@@ -247,7 +247,7 @@
 
     We'll cover the <code>SourceConnector</code> as a simple example. 
<code>SinkConnector</code> implementations are very similar. Start by creating 
the class that inherits from <code>SourceConnector</code> and add a couple of 
fields that will store parsed configuration information (the filename to read 
from and the topic to send data to):
 
-    <pre>
+    <pre class="brush: java;">
     public class FileStreamSourceConnector extends SourceConnector {
         private String filename;
         private String topic;
@@ -255,7 +255,7 @@
 
     The easiest method to fill in is <code>getTaskClass()</code>, which 
defines the class that should be instantiated in worker processes to actually 
read the data:
 
-    <pre>
+    <pre class="brush: java;">
     @Override
     public Class&lt;? extends Task&gt; getTaskClass() {
         return FileStreamSourceTask.class;
@@ -264,7 +264,7 @@
 
     We will define the <code>FileStreamSourceTask</code> class below. Next, we 
add some standard lifecycle methods, <code>start()</code> and 
<code>stop()</code>:
 
-    <pre>
+    <pre class="brush: java;">
     @Override
     public void start(Map&lt;String, String&gt; props) {
         // The complete version includes error handling as well.
@@ -282,7 +282,7 @@
     handling a single file, so even though we may be permitted to generate 
more tasks as per the
     <code>maxTasks</code> argument, we return a list with only one entry:
 
-    <pre>
+    <pre class="brush: java;">
     @Override
     public List&lt;Map&lt;String, String&gt;&gt; taskConfigs(int maxTasks) {
         ArrayList&lt;Map&lt;String, String&gt;&gt; configs = new 
ArrayList&lt;&gt;();
@@ -310,7 +310,7 @@
     Just as with the connector, we need to create a class inheriting from the 
appropriate base <code>Task</code> class. It also has some standard lifecycle 
methods:
 
 
-    <pre>
+    <pre class="brush: java;">
     public class FileStreamSourceTask extends SourceTask {
         String filename;
         InputStream stream;
@@ -333,7 +333,7 @@
 
     Next, we implement the main functionality of the task, the 
<code>poll()</code> method which gets events from the input system and returns 
a <code>List&lt;SourceRecord&gt;</code>:
 
-    <pre>
+    <pre class="brush: java;">
     @Override
     public List&lt;SourceRecord&gt; poll() throws InterruptedException {
         try {
@@ -365,7 +365,7 @@
 
     The previous section described how to implement a simple 
<code>SourceTask</code>. Unlike <code>SourceConnector</code> and 
<code>SinkConnector</code>, <code>SourceTask</code> and <code>SinkTask</code> 
have very different interfaces because <code>SourceTask</code> uses a pull 
interface and <code>SinkTask</code> uses a push interface. Both share the 
common lifecycle methods, but the <code>SinkTask</code> interface is quite 
different:
 
-    <pre>
+    <pre class="brush: java;">
     public abstract class SinkTask implements Task {
         public void initialize(SinkTaskContext context) {
             this.context = context;
@@ -388,7 +388,7 @@
 
     To correctly resume upon startup, the task can use the 
<code>SourceContext</code> passed into its <code>initialize()</code> method to 
access the offset data. In <code>initialize()</code>, we would add a bit more 
code to read the offset (if it exists) and seek to that position:
 
-    <pre>
+    <pre class="brush: java;">
         stream = new FileInputStream(filename);
         Map&lt;String, Object&gt; offset = 
context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, 
filename));
         if (offset != null) {
@@ -406,7 +406,7 @@
 
     Source connectors need to monitor the source system for changes, e.g. 
table additions/deletions in a database. When they pick up changes, they should 
notify the framework via the <code>ConnectorContext</code> object that 
reconfiguration is necessary. For example, in a <code>SourceConnector</code>:
 
-    <pre>
+    <pre class="brush: java;">
         if (inputsChanged())
             this.context.requestTaskReconfiguration();
     </pre>
@@ -423,7 +423,7 @@
 
     The following code in <code>FileStreamSourceConnector</code> defines the 
configuration and exposes it to the framework.
 
-    <pre>
+    <pre class="brush: java;">
         private static final ConfigDef CONFIG_DEF = new ConfigDef()
             .define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Source 
filename.")
             .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to 
publish data to");
@@ -445,7 +445,7 @@
 
     The API documentation provides a complete reference, but here is a simple 
example creating a <code>Schema</code> and <code>Struct</code>:
 
-    <pre>
+    <pre class="brush: java;">
     Schema schema = SchemaBuilder.struct().name(NAME)
         .field("name", Schema.STRING_SCHEMA)
         .field("age", Schema.INT_SCHEMA)
@@ -473,7 +473,7 @@
     When a connector is first submitted to the cluster, the workers rebalance 
the full set of connectors in the cluster and their tasks so that each worker 
has approximately the same amount of work. This same rebalancing procedure is 
also used when connectors increase or decrease the number of tasks they 
require, or when a connector's configuration is changed. You can use the REST 
API to view the current status of a connector and its tasks, including the id 
of the worker to which each was assigned. For example, querying the status of a 
file source (using <code>GET /connectors/file-source/status</code>) might 
produce output like the following:
     </p>
 
-    <pre>
+    <pre class="brush: json;">
     {
     "name": "file-source",
     "connector": {

http://git-wip-us.apache.org/repos/asf/kafka/blob/57b0d0fe/docs/design.html
----------------------------------------------------------------------
diff --git a/docs/design.html b/docs/design.html
index c061cf4..0373a2d 100644
--- a/docs/design.html
+++ b/docs/design.html
@@ -417,7 +417,7 @@
     <p>
     Let's discuss a concrete example of such a stream. Say we have a topic 
containing user email addresses; every time a user updates their email address 
we send a message to this topic using their user id as the
     primary key. Now say we send the following messages over some time period 
for a user with id 123, each message corresponding to a change in email address 
(messages for other ids are omitted):
-    <pre>
+    <pre class="brush: text;">
         123 => [email protected]
                 .
                 .
@@ -510,11 +510,11 @@
 
     The log cleaner is enabled by default. This will start the pool of cleaner 
threads. 
     To enable log cleaning on a particular topic you can add the log-specific 
property
-    <pre>  log.cleanup.policy=compact</pre>
+    <pre class="brush: text;">  log.cleanup.policy=compact</pre>
     This can be done either at topic creation time or using the alter topic 
command.
     <p>
     The log cleaner can be configured to retain a minimum amount of the 
uncompacted "head" of the log. This is enabled by setting the compaction time 
lag.
-    <pre>  log.cleaner.min.compaction.lag.ms</pre>
+    <pre class="brush: text;">  log.cleaner.min.compaction.lag.ms</pre>
 
     This can be used to prevent messages newer than a minimum message age from 
being subject to compaction. If not set, all log segments are eligible for 
compaction except for the last segment, i.e. the one currently
     being written to. The active segment will not be compacted even if all of 
its messages are older than the minimum compaction time lag.

http://git-wip-us.apache.org/repos/asf/kafka/blob/57b0d0fe/docs/implementation.html
----------------------------------------------------------------------
diff --git a/docs/implementation.html b/docs/implementation.html
index 48602f2..5ff75ec 100644
--- a/docs/implementation.html
+++ b/docs/implementation.html
@@ -22,8 +22,8 @@
 
     <p>
     The Producer API that wraps the 2 low-level producers - 
<code>kafka.producer.SyncProducer</code> and 
<code>kafka.producer.async.AsyncProducer</code>.
-    <pre>
-    class Producer<T> {
+    <pre class="brush: java;">
+    class Producer&lt;T&gt; {
 
     /* Sends the data, partitioned by key to the topic using either the */
     /* synchronous or the asynchronous producer */
@@ -48,7 +48,7 @@
     </p>
     </li>
     <li>handles the serialization of data through a user-specified 
<code>Encoder</code>:
-    <pre>
+    <pre class="brush: java;">
     interface Encoder&lt;T&gt; {
     public Message toMessage(T data);
     }
@@ -58,7 +58,7 @@
     <li>provides software load balancing through an optionally user-specified 
<code>Partitioner</code>:
     <p>
     The routing decision is influenced by the 
<code>kafka.producer.Partitioner</code>.
-    <pre>
+    <pre class="brush: java;">
     interface Partitioner&lt;T&gt; {
     int partition(T key, int numPartitions);
     }
@@ -78,7 +78,7 @@
     </p>
 
     <h5><a id="impl_lowlevel" href="#impl_lowlevel">Low-level API</a></h5>
-    <pre>
+    <pre class="brush: java;">
     class SimpleConsumer {
 
     /* Send fetch request to a broker and get back a set of messages. */
@@ -101,7 +101,7 @@
     The low-level API is used to implement the high-level API as well as being 
used directly for some of our offline consumers which have particular 
requirements around maintaining state.
 
     <h5><a id="impl_highlevel" href="#impl_highlevel">High-level API</a></h5>
-    <pre>
+    <pre class="brush: java;">
 
     /* create a connection to the cluster */
     ConsumerConnector connector = Consumer.create(consumerConfig);
@@ -156,8 +156,8 @@
 
     <h3><a id="messageformat" href="#messageformat">5.4 Message Format</a></h3>
 
-    <pre>
-        /**
+    <pre class="brush: java;">
+       /**
         * 1. 4 byte CRC32 of the message
         * 2. 1 byte "magic" identifier to allow format changes, value is 0 or 1
         * 3. 1 byte "attributes" identifier to allow annotations on the 
message independent of the version
@@ -185,7 +185,7 @@
     <p>
     The exact binary format for messages is versioned and maintained as a 
standard interface so message sets can be transferred between producer, broker, 
and client without recopying or conversion when desirable. This format is as 
follows:
     </p>
-    <pre>
+    <pre class="brush: java;">
     On-disk format of a message
 
     offset         : 8 bytes 
@@ -220,7 +220,7 @@
 
     <p> The following is the format of the results sent to the consumer.
 
-    <pre>
+    <pre class="brush: text;">
     MessageSetSend (fetch result)
 
     total length     : 4 bytes
@@ -230,7 +230,7 @@
     message n        : x bytes
     </pre>
 
-    <pre>
+    <pre class="brush: text;">
     MultiMessageSetSend (multiFetch result)
 
     total length       : 4 bytes
@@ -292,7 +292,7 @@
     </p>
 
     <h4><a id="impl_zkbroker" href="#impl_zkbroker">Broker Node 
Registry</a></h4>
-    <pre>
+    <pre class="brush: json;">
     /brokers/ids/[0...N] --> 
{"jmx_port":...,"timestamp":...,"endpoints":[...],"host":...,"version":...,"port":...}
 (ephemeral node)
     </pre>
     <p>
@@ -302,7 +302,7 @@
     Since the broker registers itself in ZooKeeper using ephemeral znodes, 
this registration is dynamic and will disappear if the broker is shutdown or 
dies (thus notifying consumers it is no longer available).
     </p>
     <h4><a id="impl_zktopic" href="#impl_zktopic">Broker Topic 
Registry</a></h4>
-    <pre>
+    <pre class="brush: json;">
     /brokers/topics/[topic]/partitions/[0...N]/state --> 
{"controller_epoch":...,"leader":...,"version":...,"leader_epoch":...,"isr":[...]}
 (ephemeral node)
     </pre>
 
@@ -327,7 +327,7 @@
     <h4><a id="impl_zkconsumerid" href="#impl_zkconsumerid">Consumer Id 
Registry</a></h4>
     <p>
     In addition to the group_id which is shared by all consumers in a group, 
each consumer is given a transient, unique consumer_id (of the form 
hostname:uuid) for identification purposes. Consumer ids are registered in the 
following directory.
-    <pre>
+    <pre class="brush: json;">
     /consumers/[group_id]/ids/[consumer_id] --> 
{"version":...,"subscription":{...:...},"pattern":...,"timestamp":...} 
(ephemeral node)
     </pre>
     Each of the consumers in the group registers under its group and creates a 
znode with its consumer_id. The value of the znode contains a map of &lt;topic, 
#streams&gt;. This id is simply used to identify each of the consumers which is 
currently active within a group. This is an ephemeral node so it will disappear 
if the consumer process dies.
@@ -337,7 +337,7 @@
     <p>
     Consumers track the maximum offset they have consumed in each partition. 
This value is stored in a ZooKeeper directory if 
<code>offsets.storage=zookeeper</code>.
     </p>
-    <pre>
+    <pre class="brush: json;">
     /consumers/[group_id]/offsets/[topic]/[partition_id] --> 
offset_counter_value (persistent node)
     </pre>
 
@@ -347,7 +347,7 @@
     Each broker partition is consumed by a single consumer within a given 
consumer group. The consumer must establish its ownership of a given partition 
before any consumption can begin. To establish its ownership, a consumer writes 
its own id in an ephemeral node under the particular broker partition it is 
claiming.
     </p>
 
-    <pre>
+    <pre class="brush: json;">
     /consumers/[group_id]/owners/[topic]/[partition_id] --> consumer_node_id 
(ephemeral node)
     </pre>
 
@@ -389,7 +389,7 @@
     <p>
     Each consumer does the following during rebalancing:
     </p>
-    <pre>
+    <pre class="brush: text;">
     1. For each topic T that C<sub>i</sub> subscribes to
     2.   let P<sub>T</sub> be all partitions producing topic T
     3.   let C<sub>G</sub> be all consumers in the same group as C<sub>i</sub> 
that consume topic T

http://git-wip-us.apache.org/repos/asf/kafka/blob/57b0d0fe/docs/ops.html
----------------------------------------------------------------------
diff --git a/docs/ops.html b/docs/ops.html
index 46edb19..94b6c2d 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -27,7 +27,7 @@
   You have the option of either adding topics manually or having them be 
created automatically when data is first published to a non-existent topic. If 
topics are auto-created then you may want to tune the default <a 
href="#topic-config">topic configurations</a> used for auto-created topics.
   <p>
   Topics are added and modified using the topic tool:
-  <pre>
+  <pre class="brush: bash;">
   &gt; bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic 
my_topic_name
         --partitions 20 --replication-factor 3 --config x=y
   </pre>
@@ -44,26 +44,26 @@
   You can change the configuration or partitioning of a topic using the same 
topic tool.
   <p>
   To add partitions you can do
-  <pre>
+  <pre class="brush: bash;">
   &gt; bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic 
my_topic_name
         --partitions 40
   </pre>
   Be aware that one use case for partitions is to semantically partition data, 
and adding partitions doesn't change the partitioning of existing data so this 
may disturb consumers if they rely on that partition. That is if data is 
partitioned by <code>hash(key) % number_of_partitions</code> then this 
partitioning will potentially be shuffled by adding partitions but Kafka will 
not attempt to automatically redistribute data in any way.
   <p>
   To add configs:
-  <pre>
+  <pre class="brush: bash;">
   &gt; bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic 
my_topic_name --config x=y
   </pre>
   To remove a config:
-  <pre>
+  <pre class="brush: bash;">
   &gt; bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic 
my_topic_name --delete-config x
   </pre>
   And finally deleting a topic:
-  <pre>
+  <pre class="brush: bash;">
   &gt; bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic 
my_topic_name
   </pre>
   Topic deletion option is disabled by default. To enable it set the server 
config
-    <pre>delete.topic.enable=true</pre>
+    <pre class="brush: text;">delete.topic.enable=true</pre>
   <p>
   Kafka does not currently support reducing the number of partitions for a 
topic.
   <p>
@@ -80,7 +80,7 @@
   </ol>
 
   Syncing the logs will happen automatically whenever the server is stopped 
other than by a hard kill, but the controlled leadership migration requires 
using a special setting:
-  <pre>
+  <pre class="brush: text;">
       controlled.shutdown.enable=true
   </pre>
   Note that controlled shutdown will only succeed if <i>all</i> the partitions 
hosted on the broker have replicas (i.e. the replication factor is greater than 
1 <i>and</i> at least one of these replicas is alive). This is generally what 
you want since shutting down the last replica would make that topic partition 
unavailable.
@@ -90,12 +90,12 @@
   Whenever a broker stops or crashes leadership for that broker's partitions 
transfers to other replicas. This means that by default when the broker is 
restarted it will only be a follower for all its partitions, meaning it will 
not be used for client reads and writes.
   <p>
   To avoid this imbalance, Kafka has a notion of preferred replicas. If the 
list of replicas for a partition is 1,5,9 then node 1 is preferred as the 
leader to either node 5 or 9 because it is earlier in the replica list. You can 
have the Kafka cluster try to restore leadership to the restored replicas by 
running the command:
-  <pre>
+  <pre class="brush: bash;">
   &gt; bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
   </pre>
 
   Since running this command can be tedious you can also configure Kafka to do 
this automatically by setting the following configuration:
-  <pre>
+  <pre class="brush: text;">
       auto.leader.rebalance.enable=true
   </pre>
 
@@ -103,7 +103,7 @@
   The rack awareness feature spreads replicas of the same partition across 
different racks. This extends the guarantees Kafka provides for broker-failure 
to cover rack-failure, limiting the risk of data loss should all the brokers on 
a rack fail at once. The feature can also be applied to other broker groupings 
such as availability zones in EC2.
   <p></p>
   You can specify that a broker belongs to a particular rack by adding a 
property to the broker config:
-  <pre>   broker.rack=my-rack-id</pre>
+  <pre class="brush: text;">   broker.rack=my-rack-id</pre>
   When a topic is <a href="#basic_ops_add_topic">created</a>, <a 
href="#basic_ops_modify_topic">modified</a> or replicas are <a 
href="#basic_ops_cluster_expansion">redistributed</a>, the rack constraint will 
be honoured, ensuring replicas span as many racks as they can (a partition will 
span min(#racks, replication-factor) different racks).
   <p></p>
   The algorithm used to assign replicas to brokers ensures that the number of 
leaders per broker will be constant, regardless of how brokers are distributed 
across racks. This ensures balanced throughput.
@@ -123,7 +123,7 @@
   The source and destination clusters are completely independent entities: 
they can have different numbers of partitions and the offsets will not be the 
same. For this reason the mirror cluster is not really intended as a 
fault-tolerance mechanism (as the consumer position will be different); for 
that we recommend using normal in-cluster replication. The mirror maker process 
will, however, retain and use the message key for partitioning so order is 
preserved on a per-key basis.
   <p>
   Here is an example showing how to mirror a single topic (named 
<i>my-topic</i>) from an input cluster:
-  <pre>
+  <pre class="brush: bash;">
   &gt; bin/kafka-mirror-maker.sh
         --consumer.config consumer.properties
         --producer.config producer.properties --whitelist my-topic
@@ -139,7 +139,7 @@
 
   <h4><a id="basic_ops_consumer_lag" href="#basic_ops_consumer_lag">Checking 
consumer position</a></h4>
   Sometimes it's useful to see the position of your consumers. We have a tool 
that will show the position of all consumers in a consumer group as well as how 
far behind the end of the log they are. To run this tool on a consumer group 
named <i>my-group</i> consuming a topic named <i>my-topic</i> would look like 
this:
-  <pre>
+  <pre class="brush: bash;">
   &gt; bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper 
localhost:2181 --group test
   Group           Topic                          Pid Offset          logSize   
      Lag             Owner
   my-group        my-topic                       0   0               0         
      0               test_jkreps-mn-1394154511599-60744496-0
@@ -157,7 +157,7 @@
 
   For example, to list all consumer groups across all topics:
 
-  <pre>
+  <pre class="brush: bash;">
   &gt; bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --list
 
   test-consumer-group
@@ -165,7 +165,7 @@
 
   To view offsets as in the previous example with the ConsumerOffsetChecker, 
we "describe" the consumer group like this:
 
-  <pre>
+  <pre class="brush: bash;">
   &gt; bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --describe 
--group test-consumer-group
 
   TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  
LAG        CONSUMER-ID                                       HOST               
            CLIENT-ID
@@ -175,7 +175,7 @@
   If you are using the old high-level consumer and storing the group metadata 
in ZooKeeper (i.e. <code>offsets.storage=zookeeper</code>), pass
   <code>--zookeeper</code> instead of <code>bootstrap-server</code>:
 
-  <pre>
+  <pre class="brush: bash;">
   &gt; bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
   </pre>
 
@@ -199,7 +199,7 @@
   For instance, the following example will move all partitions for topics 
foo1,foo2 to the new set of brokers 5,6. At the end of this move, all 
partitions for topics foo1 and foo2 will <i>only</i> exist on brokers 5,6.
   <p>
   Since the tool accepts the input list of topics as a json file, you first 
need to identify the topics you want to move and create the json file as 
follows:
-  <pre>
+  <pre class="brush: bash;">
   > cat topics-to-move.json
   {"topics": [{"topic": "foo1"},
               {"topic": "foo2"}],
@@ -207,7 +207,7 @@
   }
   </pre>
   Once the json file is ready, use the partition reassignment tool to generate 
a candidate assignment:
-  <pre>
+  <pre class="brush: bash;">
   > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 
--topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
   Current partition replica assignment
 
@@ -233,7 +233,7 @@
   </pre>
   <p>
   The tool generates a candidate assignment that will move all partitions from 
topics foo1,foo2 to brokers 5,6. Note, however, that at this point, the 
partition movement has not started, it merely tells you the current assignment 
and the proposed new assignment. The current assignment should be saved in case 
you want to rollback to it. The new assignment should be saved in a json file 
(e.g. expand-cluster-reassignment.json) to be input to the tool with the 
--execute option as follows:
-  <pre>
+  <pre class="brush: bash;">
   > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 
--reassignment-json-file expand-cluster-reassignment.json --execute
   Current partition replica assignment
 
@@ -259,7 +259,7 @@
   </pre>
   <p>
   Finally, the --verify option can be used with the tool to check the status 
of the partition reassignment. Note that the same 
expand-cluster-reassignment.json (used with the --execute option) should be 
used with the --verify option:
-  <pre>
+  <pre class="brush: bash;">
   > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 
--reassignment-json-file expand-cluster-reassignment.json --verify
   Status of partition reassignment:
   Reassignment of partition [foo1,0] completed successfully
@@ -276,12 +276,12 @@
   For instance, the following example moves partition 0 of topic foo1 to 
brokers 5,6 and partition 1 of topic foo2 to brokers 2,3:
   <p>
   The first step is to hand craft the custom reassignment plan in a json file:
-  <pre>
+  <pre class="brush: bash;">
   > cat custom-reassignment.json
   
{"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}
   </pre>
   Then, use the json file with the --execute option to start the reassignment 
process:
-  <pre>
+  <pre class="brush: bash;">
   > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 
--reassignment-json-file custom-reassignment.json --execute
   Current partition replica assignment
 
@@ -299,8 +299,8 @@
   </pre>
   <p>
   The --verify option can be used with the tool to check the status of the 
partition reassignment. Note that the same expand-cluster-reassignment.json 
(used with the --execute option) should be used with the --verify option:
-  <pre>
-  bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 
--reassignment-json-file custom-reassignment.json --verify
+  <pre class="brush: bash;">
+  > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 
--reassignment-json-file custom-reassignment.json --verify
   Status of partition reassignment:
   Reassignment of partition [foo1,0] completed successfully
   Reassignment of partition [foo2,1] completed successfully
@@ -315,13 +315,13 @@
   For instance, the following example increases the replication factor of 
partition 0 of topic foo from 1 to 3. Before increasing the replication factor, 
the partition's only replica existed on broker 5. As part of increasing the 
replication factor, we will add more replicas on brokers 6 and 7.
   <p>
   The first step is to hand craft the custom reassignment plan in a json file:
-  <pre>
+  <pre class="brush: bash;">
   > cat increase-replication-factor.json
   {"version":1,
   "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
   </pre>
   Then, use the json file with the --execute option to start the reassignment 
process:
-  <pre>
+  <pre class="brush: bash;">
   > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 
--reassignment-json-file increase-replication-factor.json --execute
   Current partition replica assignment
 
@@ -335,13 +335,13 @@
   </pre>
   <p>
   The --verify option can be used with the tool to check the status of the 
partition reassignment. Note that the same increase-replication-factor.json 
(used with the --execute option) should be used with the --verify option:
-  <pre>
-  bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 
--reassignment-json-file increase-replication-factor.json --verify
+  <pre class="brush: bash;">
+  > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 
--reassignment-json-file increase-replication-factor.json --verify
   Status of partition reassignment:
   Reassignment of partition [foo,0] completed successfully
   </pre>
   You can also verify the increase in replication factor with the kafka-topics 
tool:
-  <pre>
+  <pre class="brush: bash;">
   > bin/kafka-topics.sh --zookeeper localhost:2181 --topic foo --describe
   Topic:foo    PartitionCount:1        ReplicationFactor:3     Configs:
     Topic: foo Partition: 0    Leader: 5       Replicas: 5,6,7 Isr: 5,6,7
@@ -353,13 +353,13 @@
   There are two interfaces that can be used to engage a throttle. The 
simplest, and safest, is to apply a throttle when invoking the 
kafka-reassign-partitions.sh, but kafka-configs.sh can also be used to view and 
alter the throttle values directly.
   <p></p>
   So for example, if you were to execute a rebalance, with the below command, 
it would move partitions at no more than 50MB/s.
-  <pre>$ bin/kafka-reassign-partitions.sh --zookeeper myhost:2181--execute 
--reassignment-json-file bigger-cluster.json —throttle 50000000</pre>
+  <pre class="brush: bash;">$ bin/kafka-reassign-partitions.sh --zookeeper 
myhost:2181--execute --reassignment-json-file bigger-cluster.json —throttle 
50000000</pre>
   When you execute this script you will see the throttle engage:
-  <pre>
+  <pre class="brush: bash;">
   The throttle limit was set to 50000000 B/s
   Successfully started reassignment of partitions.</pre>
   <p>Should you wish to alter the throttle, during a rebalance, say to 
increase the throughput so it completes quicker, you can do this by re-running 
the execute command passing the same reassignment-json-file:</p>
-  <pre>$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181  
--execute --reassignment-json-file bigger-cluster.json --throttle 700000000
+  <pre class="brush: bash;">$ bin/kafka-reassign-partitions.sh --zookeeper 
localhost:2181  --execute --reassignment-json-file bigger-cluster.json 
--throttle 700000000
   There is an existing assignment running.
   The throttle limit was set to 700000000 B/s</pre>
 
@@ -369,7 +369,8 @@
       the --verify option. Failure to do so could cause regular replication 
traffic to be throttled. </p>
   <p>When the --verify option is executed, and the reassignment has completed, 
the script will confirm that the throttle was removed:</p>
 
-  <pre>$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181  --verify 
--reassignment-json-file bigger-cluster.json
+  <pre class="brush: bash;">
+  > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181  --verify 
--reassignment-json-file bigger-cluster.json
   Status of partition reassignment:
   Reassignment of partition [my-topic,1] completed successfully
   Reassignment of partition [mytopic,0] completed successfully
@@ -379,19 +380,20 @@
       configuration used to manage the throttling process. The throttle value 
itself. This is configured, at a broker
       level, using the dynamic properties: </p>
 
-  <pre>leader.replication.throttled.rate
+  <pre class="brush: text;">leader.replication.throttled.rate
   follower.replication.throttled.rate</pre>
 
   <p>There is also an enumerated set of throttled replicas: </p>
 
-  <pre>leader.replication.throttled.replicas
+  <pre class="brush: text;">leader.replication.throttled.replicas
   follower.replication.throttled.replicas</pre>
 
   <p>Which are configured per topic. All four config values are automatically 
assigned by kafka-reassign-partitions.sh
       (discussed below). </p>
   <p>To view the throttle limit configuration:</p>
 
-  <pre>$ bin/kafka-configs.sh --describe --zookeeper localhost:2181 
--entity-type brokers
+  <pre class="brush: bash;">
+  > bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type 
brokers
   Configs for brokers '2' are 
leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000
   Configs for brokers '1' are 
leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000</pre>
 
@@ -400,7 +402,8 @@
 
   <p>To view the list of throttled replicas:</p>
 
-  <pre>$ bin/kafka-configs.sh --describe --zookeeper localhost:2181 
--entity-type topics
+  <pre class="brush: bash;">
+  > bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type 
topics
   Configs for topic 'my-topic' are 
leader.replication.throttled.replicas=1:102,0:101,
       follower.replication.throttled.replicas=1:101,0:102</pre>
 
@@ -448,19 +451,19 @@
   It is possible to set custom quotas for each (user, client-id), user or 
client-id group.
   <p>
   Configure custom quota for (user=user1, client-id=clientA):
-  <pre>
+  <pre class="brush: bash;">
   > bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 
'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' 
--entity-type users --entity-name user1 --entity-type clients --entity-name 
clientA
   Updated config for entity: user-principal 'user1', client-id 'clientA'.
   </pre>
 
   Configure custom quota for user=user1:
-  <pre>
+  <pre class="brush: bash;">
   > bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 
'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' 
--entity-type users --entity-name user1
   Updated config for entity: user-principal 'user1'.
   </pre>
 
   Configure custom quota for client-id=clientA:
-  <pre>
+  <pre class="brush: bash;">
   > bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 
'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' 
--entity-type clients --entity-name clientA
   Updated config for entity: client-id 'clientA'.
   </pre>
@@ -468,53 +471,53 @@
   It is possible to set default quotas for each (user, client-id), user or 
client-id group by specifying <i>--entity-default</i> option instead of 
<i>--entity-name</i>.
   <p>
   Configure default client-id quota for user=userA:
-  <pre>
+  <pre class="brush: bash;">
   > bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 
'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' 
--entity-type users --entity-name user1 --entity-type clients --entity-default
   Updated config for entity: user-principal 'user1', default client-id.
   </pre>
 
   Configure default quota for user:
-  <pre>
+  <pre class="brush: bash;">
   > bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 
'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' 
--entity-type users --entity-default
   Updated config for entity: default user-principal.
   </pre>
 
   Configure default quota for client-id:
-  <pre>
+  <pre class="brush: bash;">
   > bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 
'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' 
--entity-type clients --entity-default
   Updated config for entity: default client-id.
   </pre>
 
   Here's how to describe the quota for a given (user, client-id):
-  <pre>
+  <pre class="brush: bash;">
   > bin/kafka-configs.sh  --zookeeper localhost:2181 --describe --entity-type 
users --entity-name user1 --entity-type clients --entity-name clientA
   Configs for user-principal 'user1', client-id 'clientA' are 
producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
   </pre>
   Describe quota for a given user:
-  <pre>
+  <pre class="brush: bash;">
   > bin/kafka-configs.sh  --zookeeper localhost:2181 --describe --entity-type 
users --entity-name user1
   Configs for user-principal 'user1' are 
producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
   </pre>
   Describe quota for a given client-id:
-  <pre>
+  <pre class="brush: bash;">
   > bin/kafka-configs.sh  --zookeeper localhost:2181 --describe --entity-type 
clients --entity-name clientA
   Configs for client-id 'clientA' are 
producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
   </pre>
   If entity name is not specified, all entities of the specified type are 
described. For example, describe all users:
-  <pre>
+  <pre class="brush: bash;">
   > bin/kafka-configs.sh  --zookeeper localhost:2181 --describe --entity-type 
users
   Configs for user-principal 'user1' are 
producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
   Configs for default user-principal are 
producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
   </pre>
   Similarly for (user, client):
-  <pre>
+  <pre class="brush: bash;">
   > bin/kafka-configs.sh  --zookeeper localhost:2181 --describe --entity-type 
users --entity-type clients
   Configs for user-principal 'user1', default client-id are 
producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
   Configs for user-principal 'user1', client-id 'clientA' are 
producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
   </pre>
   <p>
   It is possible to set default quotas that apply to all client-ids by setting 
these configs on the brokers. These properties are applied only if quota 
overrides or defaults are not configured in Zookeeper. By default, each 
client-id receives an unlimited quota. The following sets the default quota per 
producer and consumer client-id to 10MB/sec.
-  <pre>
+  <pre class="brush: text;">
     quota.producer.default=10485760
     quota.consumer.default=10485760
   </pre>
@@ -556,7 +559,7 @@
   <p>
   <h4><a id="prodconfig" href="#prodconfig">A Production Server Config</a></h4>
   Here is an example production server configuration:
-  <pre>
+  <pre class="brush: text;">
   # ZooKeeper
   zookeeper.connect=[list of ZooKeeper servers]
 
@@ -582,7 +585,7 @@
   LinkedIn is currently running JDK 1.8 u5 (looking to upgrade to a newer 
version) with the G1 collector. If you decide to use the G1 collector (the 
current default) and you are still on JDK 1.7, make sure you are on u51 or 
newer. LinkedIn tried out u21 in testing, but they had a number of problems 
with the GC implementation in that version.
 
   LinkedIn's tuning looks like this:
-  <pre>
+  <pre class="brush: text;">
   -Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC
   -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 
-XX:G1HeapRegionSize=16M
   -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80
@@ -648,9 +651,7 @@
   When Pdflush cannot keep up with the rate of data being written it will 
eventually cause the writing process to block incurring latency in the writes 
to slow down the accumulation of data.
   <p>
   You can see the current state of OS memory usage by doing
-  <pre>
-    &gt; cat /proc/meminfo
-  </pre>
+  <pre class="brush: bash;"> &gt; cat /proc/meminfo </pre>
   The meaning of these values are described in the link above.
   <p>
   Using pagecache has several advantages over an in-process cache for storing 
data that will be written out to disk:

http://git-wip-us.apache.org/repos/asf/kafka/blob/57b0d0fe/docs/quickstart.html
----------------------------------------------------------------------
diff --git a/docs/quickstart.html b/docs/quickstart.html
index 461e184..b8722a6 100644
--- a/docs/quickstart.html
+++ b/docs/quickstart.html
@@ -27,9 +27,9 @@ Since Kafka console scripts are different for Unix-based and 
Windows platforms,
 
 <a 
href="https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz";
 title="Kafka downloads">Download</a> the 0.10.2.0 release and un-tar it.
 
-<pre>
-&gt; <b>tar -xzf kafka_2.11-0.10.2.0.tgz</b>
-&gt; <b>cd kafka_2.11-0.10.2.0</b>
+<pre class="brush: bash;">
+&gt; tar -xzf kafka_2.11-0.10.2.0.tgz
+&gt; cd kafka_2.11-0.10.2.0
 </pre>
 
 <h4><a id="quickstart_startserver" href="#quickstart_startserver">Step 2: 
Start the server</a></h4>
@@ -38,15 +38,15 @@ Since Kafka console scripts are different for Unix-based 
and Windows platforms,
 Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you 
don't already have one. You can use the convenience script packaged with kafka 
to get a quick-and-dirty single-node ZooKeeper instance.
 </p>
 
-<pre>
-&gt; <b>bin/zookeeper-server-start.sh config/zookeeper.properties</b>
+<pre class="brush: bash;">
+&gt; bin/zookeeper-server-start.sh config/zookeeper.properties
 [2013-04-22 15:01:37,495] INFO Reading configuration from: 
config/zookeeper.properties 
(org.apache.zookeeper.server.quorum.QuorumPeerConfig)
 ...
 </pre>
 
 <p>Now start the Kafka server:</p>
-<pre>
-&gt; <b>bin/kafka-server-start.sh config/server.properties</b>
+<pre class="brush: bash;">
+&gt; bin/kafka-server-start.sh config/server.properties
 [2013-04-22 15:01:47,028] INFO Verifying properties 
(kafka.utils.VerifiableProperties)
 [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden 
to 1048576 (kafka.utils.VerifiableProperties)
 ...
@@ -55,13 +55,13 @@ Kafka uses ZooKeeper so you need to first start a ZooKeeper 
server if you don't
 <h4><a id="quickstart_createtopic" href="#quickstart_createtopic">Step 3: 
Create a topic</a></h4>
 
 <p>Let's create a topic named "test" with a single partition and only one 
replica:</p>
-<pre>
-&gt; <b>bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test</b>
+<pre class="brush: bash;">
+&gt; bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test
 </pre>
 
 <p>We can now see that topic if we run the list topic command:</p>
-<pre>
-&gt; <b>bin/kafka-topics.sh --list --zookeeper localhost:2181</b>
+<pre class="brush: bash;">
+&gt; bin/kafka-topics.sh --list --zookeeper localhost:2181
 test
 </pre>
 <p>Alternatively, instead of manually creating topics you can also configure 
your brokers to auto-create topics when a non-existent topic is published 
to.</p>
@@ -72,18 +72,18 @@ test
 <p>
 Run the producer and then type a few messages into the console to send to the 
server.</p>
 
-<pre>
-&gt; <b>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
test</b>
-<b>This is a message</b>
-<b>This is another message</b>
+<pre class="brush: bash;">
+&gt; bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
+This is a message
+This is another message
 </pre>
 
 <h4><a id="quickstart_consume" href="#quickstart_consume">Step 5: Start a 
consumer</a></h4>
 
 <p>Kafka also has a command line consumer that will dump out messages to 
standard output.</p>
 
-<pre>
-&gt; <b>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 
--topic test --from-beginning</b>
+<pre class="brush: bash;">
+&gt; bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 
test --from-beginning
 This is a message
 This is another message
 </pre>
@@ -100,15 +100,15 @@ All of the command line tools have additional options; 
running the command with
 <p>
 First we make a config file for each of the brokers (on Windows use the 
<code>copy</code> command instead):
 </p>
-<pre>
-&gt; <b>cp config/server.properties config/server-1.properties</b>
-&gt; <b>cp config/server.properties config/server-2.properties</b>
+<pre class="brush: bash;">
+&gt; cp config/server.properties config/server-1.properties
+&gt; cp config/server.properties config/server-2.properties
 </pre>
 
 <p>
 Now edit these new files and set the following properties:
 </p>
-<pre>
+<pre class="brush: text;">
 
 config/server-1.properties:
     broker.id=1
@@ -124,21 +124,21 @@ config/server-2.properties:
 <p>
 We already have Zookeeper and our single node started, so we just need to 
start the two new nodes:
 </p>
-<pre>
-&gt; <b>bin/kafka-server-start.sh config/server-1.properties &amp;</b>
+<pre class="brush: bash;">
+&gt; bin/kafka-server-start.sh config/server-1.properties &amp;
 ...
-&gt; <b>bin/kafka-server-start.sh config/server-2.properties &amp;</b>
+&gt; bin/kafka-server-start.sh config/server-2.properties &amp;
 ...
 </pre>
 
 <p>Now create a new topic with a replication factor of three:</p>
-<pre>
-&gt; <b>bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 3 --partitions 1 --topic my-replicated-topic</b>
+<pre class="brush: bash;">
+&gt; bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 3 --partitions 1 --topic my-replicated-topic
 </pre>
 
 <p>Okay but now that we have a cluster how can we know which broker is doing 
what? To see that run the "describe topics" command:</p>
-<pre>
-&gt; <b>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic 
my-replicated-topic</b>
+<pre class="brush: bash;">
+&gt; bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic 
my-replicated-topic
 Topic:my-replicated-topic      PartitionCount:1        ReplicationFactor:3     
Configs:
        Topic: my-replicated-topic      Partition: 0    Leader: 1       
Replicas: 1,2,0 Isr: 1,2,0
 </pre>
@@ -152,8 +152,8 @@ Topic:my-replicated-topic   PartitionCount:1        
ReplicationFactor:3     Configs:
 <p>
 We can run the same command on the original topic we created to see where it 
is:
 </p>
-<pre>
-&gt; <b>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic 
test</b>
+<pre class="brush: bash;">
+&gt; bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
 Topic:test     PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: test     Partition: 0    Leader: 0       Replicas: 0     Isr: 0
 </pre>
@@ -161,50 +161,50 @@ Topic:test        PartitionCount:1        
ReplicationFactor:1     Configs:
 <p>
 Let's publish a few messages to our new topic:
 </p>
-<pre>
-&gt; <b>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
my-replicated-topic</b>
+<pre class="brush: bash;">
+&gt; bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
my-replicated-topic
 ...
-<b>my test message 1</b>
-<b>my test message 2</b>
-<b>^C</b>
+my test message 1
+my test message 2
+^C
 </pre>
 <p>Now let's consume these messages:</p>
-<pre>
-&gt; <b>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 
--from-beginning --topic my-replicated-topic</b>
+<pre class="brush: bash;">
+&gt; bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 
--from-beginning --topic my-replicated-topic
 ...
 my test message 1
 my test message 2
-<b>^C</b>
+^C
 </pre>
 
 <p>Now let's test out fault-tolerance. Broker 1 was acting as the leader so 
let's kill it:</p>
-<pre>
-&gt; <b>ps aux | grep server-1.properties</b>
-<i>7564</i> ttys002    0:15.91 
/System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
-&gt; <b>kill -9 7564</b>
+<pre class="brush: bash;">
+&gt; ps aux | grep server-1.properties
+7564 ttys002    0:15.91 
/System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
+&gt; kill -9 7564
 </pre>
 
 On Windows use:
-<pre>
-&gt; <b>wmic process get processid,caption,commandline | find "java.exe" | 
find "server-1.properties"</b>
-java.exe    java  -Xmx1G -Xms1G -server -XX:+UseG1GC ... 
build\libs\kafka_2.11-0.10.2.0.jar"  kafka.Kafka config\server-1.properties    
<i>644</i>
-&gt; <b>taskkill /pid 644 /f</b>
+<pre class="brush: bash;">
+&gt; wmic process get processid,caption,commandline | find "java.exe" | find 
"server-1.properties"
+java.exe    java  -Xmx1G -Xms1G -server -XX:+UseG1GC ... 
build\libs\kafka_2.11-0.10.2.0.jar"  kafka.Kafka config\server-1.properties    
644
+&gt; taskkill /pid 644 /f
 </pre>
 
 <p>Leadership has switched to one of the slaves and node 1 is no longer in the 
in-sync replica set:</p>
 
-<pre>
-&gt; <b>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic 
my-replicated-topic</b>
+<pre class="brush: bash;">
+&gt; bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic 
my-replicated-topic
 Topic:my-replicated-topic      PartitionCount:1        ReplicationFactor:3     
Configs:
        Topic: my-replicated-topic      Partition: 0    Leader: 2       
Replicas: 1,2,0 Isr: 2,0
 </pre>
 <p>But the messages are still available for consumption even though the leader 
that took the writes originally is down:</p>
-<pre>
-&gt; <b>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 
--from-beginning --topic my-replicated-topic</b>
+<pre class="brush: bash;">
+&gt; bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 
--from-beginning --topic my-replicated-topic
 ...
 my test message 1
 my test message 2
-<b>^C</b>
+^C
 </pre>
 
 
@@ -221,8 +221,8 @@ Kafka topic to a file.</p>
 
 <p>First, we'll start by creating some seed data to test with:</p>
 
-<pre>
-&gt; <b>echo -e "foo\nbar" > test.txt</b>
+<pre class="brush: bash;">
+&gt; echo -e "foo\nbar" > test.txt
 </pre>
 
 <p>Next, we'll start two connectors running in <i>standalone</i> mode, which 
means they run in a single, local, dedicated
@@ -231,8 +231,8 @@ process, containing common configuration such as the Kafka 
brokers to connect to
 The remaining configuration files each specify a connector to create. These 
files include a unique connector name, the connector
 class to instantiate, and any other configuration required by the 
connector.</p>
 
-<pre>
-&gt; <b>bin/connect-standalone.sh config/connect-standalone.properties 
config/connect-file-source.properties config/connect-file-sink.properties</b>
+<pre class="brush: bash;">
+&gt; bin/connect-standalone.sh config/connect-standalone.properties 
config/connect-file-source.properties config/connect-file-sink.properties
 </pre>
 
 <p>
@@ -250,8 +250,8 @@ by examining the contents of the output file:
 </p>
 
 
-<pre>
-&gt; <b>cat test.sink.txt</b>
+<pre class="brush: bash;">
+&gt; cat test.sink.txt
 foo
 bar
 </pre>
@@ -262,8 +262,8 @@ data in the topic (or use custom consumer code to process 
it):
 </p>
 
 
-<pre>
-&gt; <b>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 
--topic connect-test --from-beginning</b>
+<pre class="brush: bash;">
+&gt; bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 
connect-test --from-beginning
 {"schema":{"type":"string","optional":false},"payload":"foo"}
 {"schema":{"type":"string","optional":false},"payload":"bar"}
 ...
@@ -271,8 +271,8 @@ data in the topic (or use custom consumer code to process 
it):
 
 <p>The connectors continue to process data, so we can add data to the file and 
see it move through the pipeline:</p>
 
-<pre>
-&gt; <b>echo "Another line" >> test.txt</b>
+<pre class="brush: bash;">
+&gt; echo "Another line" >> test.txt
 </pre>
 
 <p>You should see the line appear in the console consumer output and in the 
sink file.</p>
@@ -284,7 +284,7 @@ Kafka Streams is a client library of Kafka for real-time 
stream processing and a
 This quickstart example will demonstrate how to run a streaming application 
coded in this library. Here is the gist
 of the <code><a 
href="https://github.com/apache/kafka/blob/{{dotVersion}}/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java";>WordCountDemo</a></code>
 example code (converted to use Java 8 lambda expressions for easy reading).
 </p>
-<pre>
+<pre class="brush: bash;">
 // Serializers/deserializers (serde) for String and Long types
 final Serde&lt;String&gt; stringSerde = Serdes.String();
 final Serde&lt;Long&gt; longSerde = Serdes.Long();
@@ -333,14 +333,14 @@ As the first step, we will prepare input data to a Kafka 
topic, which will subse
 
 -->
 
-<pre>
-&gt; <b>echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka 
summit" > file-input.txt</b>
+<pre class="brush: bash;">
+&gt; echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka 
summit" > file-input.txt
 </pre>
 Or on Windows:
-<pre>
-&gt; <b>echo all streams lead to kafka> file-input.txt</b>
-&gt; <b>echo hello kafka streams>> file-input.txt</b>
-&gt; <b>echo|set /p=join kafka summit>> file-input.txt</b>
+<pre class="brush: bash;">
+&gt; echo all streams lead to kafka> file-input.txt
+&gt; echo hello kafka streams>> file-input.txt
+&gt; echo|set /p=join kafka summit>> file-input.txt
 </pre>
 
 <p>
@@ -349,25 +349,25 @@ which reads the data from STDIN line-by-line, and 
publishes each line as a separ
 stream data will likely be flowing continuously into Kafka where the 
application will be up and running):
 </p>
 
-<pre>
-&gt; <b>bin/kafka-topics.sh --create \</b>
-            <b>--zookeeper localhost:2181 \</b>
-            <b>--replication-factor 1 \</b>
-            <b>--partitions 1 \</b>
-            <b>--topic streams-file-input</b>
+<pre class="brush: bash;">
+&gt; bin/kafka-topics.sh --create \
+    --zookeeper localhost:2181 \
+    --replication-factor 1 \
+    --partitions 1 \
+    --topic streams-file-input
 </pre>
 
 
-<pre>
-&gt; <b>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
streams-file-input < file-input.txt</b>
+<pre class="brush: bash;">
+&gt; bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
streams-file-input < file-input.txt
 </pre>
 
 <p>
 We can now run the WordCount demo application to process the input data:
 </p>
 
-<pre>
-&gt; <b>bin/kafka-run-class.sh 
org.apache.kafka.streams.examples.wordcount.WordCountDemo</b>
+<pre class="brush: bash;">
+&gt; bin/kafka-run-class.sh 
org.apache.kafka.streams.examples.wordcount.WordCountDemo
 </pre>
 
 <p>
@@ -380,22 +380,22 @@ The demo will run for a few seconds and then, unlike 
typical stream processing a
 We can now inspect the output of the WordCount demo application by reading 
from its output topic:
 </p>
 
-<pre>
-&gt; <b>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \</b>
-            <b>--topic streams-wordcount-output \</b>
-            <b>--from-beginning \</b>
-            <b>--formatter kafka.tools.DefaultMessageFormatter \</b>
-            <b>--property print.key=true \</b>
-            <b>--property print.value=true \</b>
-            <b>--property 
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \</b>
-            <b>--property 
value.deserializer=org.apache.kafka.common.serialization.LongDeserializer</b>
+<pre class="brush: bash;">
+&gt; bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
+    --topic streams-wordcount-output \
+    --from-beginning \
+    --formatter kafka.tools.DefaultMessageFormatter \
+    --property print.key=true \
+    --property print.value=true \
+    --property 
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
+    --property 
value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
 </pre>
 
 <p>
 with the following output data being printed to the console:
 </p>
 
-<pre>
+<pre class="brush: bash;">
 all     1
 lead    1
 to      1

http://git-wip-us.apache.org/repos/asf/kafka/blob/57b0d0fe/docs/security.html
----------------------------------------------------------------------
diff --git a/docs/security.html b/docs/security.html
index b2d4759..09a6c33 100644
--- a/docs/security.html
+++ b/docs/security.html
@@ -42,7 +42,7 @@
         <li><h4><a id="security_ssl_key" href="#security_ssl_key">Generate SSL 
key and certificate for each Kafka broker</a></h4>
             The first step of deploying HTTPS is to generate the key and the 
certificate for each machine in the cluster. You can use Java's keytool utility 
to accomplish this task.
             We will generate the key into a temporary keystore initially so 
that we can export and sign it later with CA.
-            <pre>
+            <pre class="brush: bash;">
             keytool -keystore server.keystore.jks -alias localhost -validity 
{validity} -genkey</pre>
 
             You need to specify two parameters in the above command:
@@ -53,7 +53,7 @@
             <br>
         Note: By default the property 
<code>ssl.endpoint.identification.algorithm</code> is not defined, so hostname 
verification is not performed. In order to enable hostname verification, set 
the following property:
 
-        <pre>  ssl.endpoint.identification.algorithm=HTTPS </pre>
+        <pre class="brush: text;">     
ssl.endpoint.identification.algorithm=HTTPS </pre>
 
         Once enabled, clients will verify the server's fully qualified domain 
name (FQDN) against one of the following two fields:
         <ol>
@@ -62,43 +62,43 @@
         </ol>
         <br>
         Both fields are valid, RFC-2818 recommends the use of SAN however. SAN 
is also more flexible, allowing for multiple DNS entries to be declared. 
Another advantage is that the CN can be set to a more meaningful value for 
authorization purposes. To add a SAN field  append the following argument 
<code> -ext SAN=DNS:{FQDN} </code> to the keytool command:
-        <pre>
+        <pre class="brush: bash;">
         keytool -keystore server.keystore.jks -alias localhost -validity 
{validity} -genkey -ext SAN=DNS:{FQDN}
         </pre>
         The following command can be run afterwards to verify the contents of 
the generated certificate:
-        <pre>
+        <pre class="brush: bash;">
         keytool -list -v -keystore server.keystore.jks
         </pre>
         </li>
         <li><h4><a id="security_ssl_ca" href="#security_ssl_ca">Creating your 
own CA</a></h4>
             After the first step, each machine in the cluster has a 
public-private key pair, and a certificate to identify the machine. The 
certificate, however, is unsigned, which means that an attacker can create such 
a certificate to pretend to be any machine.<p>
             Therefore, it is important to prevent forged certificates by 
signing them for each machine in the cluster. A certificate authority (CA) is 
responsible for signing certificates. CA works likes a government that issues 
passports—the government stamps (signs) each passport so that the passport 
becomes difficult to forge. Other governments verify the stamps to ensure the 
passport is authentic. Similarly, the CA signs the certificates, and the 
cryptography guarantees that a signed certificate is computationally difficult 
to forge. Thus, as long as the CA is a genuine and trusted authority, the 
clients have high assurance that they are connecting to the authentic machines.
-            <pre>
-            openssl req <b>-new</b> -x509 -keyout ca-key -out ca-cert -days 
365</pre>
+            <pre class="brush: bash;">
+            openssl req -new -x509 -keyout ca-key -out ca-cert -days 365</pre>
 
             The generated CA is simply a public-private key pair and 
certificate, and it is intended to sign other certificates.<br>
 
             The next step is to add the generated CA to the **clients' 
truststore** so that the clients can trust this CA:
-            <pre>
+            <pre class="brush: bash;">
             keytool -keystore client.truststore.jks -alias CARoot -import 
-file ca-cert</pre>
 
             <b>Note:</b> If you configure the Kafka brokers to require client 
authentication by setting ssl.client.auth to be "requested" or "required" on 
the <a href="#config_broker">Kafka brokers config</a> then you must provide a 
truststore for the Kafka brokers as well and it should have all the CA 
certificates that clients' keys were signed by.
-            <pre>
-            keytool -keystore server.truststore.jks -alias CARoot 
<b>-import</b> -file ca-cert</pre>
+            <pre class="brush: bash;">
+            keytool -keystore server.truststore.jks -alias CARoot -import 
-file ca-cert</pre>
 
             In contrast to the keystore in step 1 that stores each machine's 
own identity, the truststore of a client stores all the certificates that the 
client should trust. Importing a certificate into one's truststore also means 
trusting all certificates that are signed by that certificate. As the analogy 
above, trusting the government (CA) also means trusting all passports 
(certificates) that it has issued. This attribute is called the chain of trust, 
and it is particularly useful when deploying SSL on a large Kafka cluster. You 
can sign all certificates in the cluster with a single CA, and have all 
machines share the same truststore that trusts the CA. That way all machines 
can authenticate all other machines.</li>
 
         <li><h4><a id="security_ssl_signing" 
href="#security_ssl_signing">Signing the certificate</a></h4>
             The next step is to sign all certificates generated by step 1 with 
the CA generated in step 2. First, you need to export the certificate from the 
keystore:
-            <pre>
+            <pre class="brush: bash;">
             keytool -keystore server.keystore.jks -alias localhost -certreq 
-file cert-file</pre>
 
             Then sign it with the CA:
-            <pre>
+            <pre class="brush: bash;">
             openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out 
cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}</pre>
 
             Finally, you need to import both the certificate of the CA and the 
signed certificate into the keystore:
-            <pre>
+            <pre class="brush: bash;">
             keytool -keystore server.keystore.jks -alias CARoot -import -file 
ca-cert
             keytool -keystore server.keystore.jks -alias localhost -import 
-file cert-signed</pre>
 
@@ -132,11 +132,11 @@
             <pre>listeners</pre>
 
             If SSL is not enabled for inter-broker communication (see below 
for how to enable it), both PLAINTEXT and SSL ports will be necessary.
-            <pre>
+            <pre class="brush: text;">
             listeners=PLAINTEXT://host.name:port,SSL://host.name:port</pre>
 
             Following SSL configs are needed on the broker side
-            <pre>
+            <pre class="brush: text;">
             ssl.keystore.location=/var/private/ssl/server.keystore.jks
             ssl.keystore.password=test1234
             ssl.key.password=test1234
@@ -189,7 +189,7 @@
         <li><h4><a id="security_configclients" 
href="#security_configclients">Configuring Kafka Clients</a></h4>
             SSL is supported only for the new Kafka Producer and Consumer, the 
older API is not supported. The configs for SSL will be the same for both 
producer and consumer.<br>
             If client authentication is not required in the broker, then the 
following is a minimal configuration example:
-            <pre>
+            <pre class="brush: text;">
             security.protocol=SSL
             ssl.truststore.location=/var/private/ssl/client.truststore.jks
             ssl.truststore.password=test1234</pre>
@@ -197,7 +197,7 @@
             Note: ssl.truststore.password is technically optional but highly 
recommended. If a password is not set access to the truststore is still 
available, but integrity checking is disabled.
 
             If client authentication is required, then a keystore must be 
created like in step 1 and the following must also be configured:
-            <pre>
+            <pre class="brush: text;">
             ssl.keystore.location=/var/private/ssl/client.keystore.jks
             ssl.keystore.password=test1234
             ssl.key.password=test1234</pre>
@@ -212,7 +212,7 @@
                 </ol>
     <br>
             Examples using console-producer and console-consumer:
-            <pre>
+            <pre class="brush: bash;">
             kafka-console-producer.sh --broker-list localhost:9093 --topic 
test --producer.config client-ssl.properties
             kafka-console-consumer.sh --bootstrap-server localhost:9093 
--topic test --consumer.config client-ssl.properties</pre>
         </li>
@@ -278,7 +278,7 @@
                     <a href="#security_sasl_scram_clientconfig">SCRAM</a>.
                     For example, <a 
href="#security_sasl_gssapi_clientconfig">GSSAPI</a>
                     credentials may be configured as:
-                    <pre>
+                    <pre class="brush: text;">
         KafkaClient {
         com.sun.security.auth.module.Krb5LoginModule required
         useKeyTab=true
@@ -288,7 +288,7 @@
     };</pre>
                 </li>
                 <li>Pass the JAAS config file location as JVM parameter to 
each client JVM. For example:
-                    <pre>    
-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf</pre></li>
+                    <pre class="brush: bash;">    
-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf</pre></li>
        </ol>
                 </li>
             </ol>
@@ -350,7 +350,7 @@
             <li><b>Create Kerberos Principals</b><br>
             If you are using the organization's Kerberos or Active Directory 
server, ask your Kerberos administrator for a principal for each Kafka broker 
in your cluster and for every operating system user that will access Kafka with 
Kerberos authentication (via clients and tools).</br>
             If you have installed your own Kerberos, you will need to create 
these principals yourself using the following commands:
-                <pre>
+                <pre class="brush: bash;">
         sudo /usr/sbin/kadmin.local -q 'addprinc -randkey 
kafka/{hostname}@{REALM}'
         sudo /usr/sbin/kadmin.local -q "ktadd -k 
/etc/security/keytabs/{keytabname}.keytab kafka/{hostname}@{REALM}"</pre></li>
             <li><b>Make sure all hosts can be reachable using hostnames</b> - 
it is a Kerberos requirement that all your hosts can be resolved with their 
FQDNs.</li>
@@ -358,7 +358,7 @@
         <li><h5><a id="security_sasl_kerberos_brokerconfig" 
href="#security_sasl_kerberos_brokerconfig">Configuring Kafka Brokers</a></h5>
         <ol>
             <li>Add a suitably modified JAAS file similar to the one below to 
each Kafka broker's config directory, let's call it kafka_server_jaas.conf for 
this example (note that each broker should have its own keytab):
-            <pre>
+            <pre class="brush: text;">
         KafkaServer {
             com.sun.security.auth.module.Krb5LoginModule required
             useKeyTab=true
@@ -384,7 +384,7 @@
         
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf</pre>
             </li>
             <li>Make sure the keytabs configured in the JAAS file are readable 
by the operating system user who is starting kafka broker.</li>
-            <li>Configure SASL port and SASL mechanisms in server.properties 
as described <a href="#security_sasl_brokerconfig">here</a>.</pre> For example:
+            <li>Configure SASL port and SASL mechanisms in server.properties 
as described <a href="#security_sasl_brokerconfig">here</a>. For example:
             <pre>    listeners=SASL_PLAINTEXT://host.name:port
         security.inter.broker.protocol=SASL_PLAINTEXT
         sasl.mechanism.inter.broker.protocol=GSSAPI
@@ -422,7 +422,6 @@
                    as described <a 
href="#security_client_staticjaas">here</a>. Clients use the login section named
                    <tt>KafkaClient</tt>. This option allows only one user for 
all client connections from a JVM.</li>
                 <li>Make sure the keytabs configured in the JAAS configuration 
are readable by the operating system user who is starting kafka client.</li>
-                </li>
                 <li>Optionally pass the krb5 file locations as JVM parameters 
to each client JVM (see <a 
href="https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html";>here</a>
 for more details):
                 <pre>    
-Djava.security.krb5.conf=/etc/kafka/krb5.conf</pre></li>
                 <li>Configure the following properties in producer.properties 
or consumer.properties:
@@ -443,7 +442,7 @@
         <li><h5><a id="security_sasl_plain_brokerconfig" 
href="#security_sasl_plain_brokerconfig">Configuring Kafka Brokers</a></h5>
             <ol>
             <li>Add a suitably modified JAAS file similar to the one below to 
each Kafka broker's config directory, let's call it kafka_server_jaas.conf for 
this example:
-                <pre>
+                <pre class="brush: text;">
         KafkaServer {
             org.apache.kafka.common.security.plain.PlainLoginModule required
             username="admin"
@@ -458,7 +457,7 @@
                 those from other brokers using these properties.</li>
             <li>Pass the JAAS config file location as JVM parameter to each 
Kafka broker:
                 <pre>    
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf</pre></li>
-            <li>Configure SASL port and SASL mechanisms in server.properties 
as described <a href="#security_sasl_brokerconfig">here</a>.</pre> For example:
+            <li>Configure SASL port and SASL mechanisms in server.properties 
as described <a href="#security_sasl_brokerconfig">here</a>. For example:
                 <pre>    listeners=SASL_SSL://host.name:port
         security.inter.broker.protocol=SASL_SSL
         sasl.mechanism.inter.broker.protocol=PLAIN
@@ -472,7 +471,7 @@
             <li>Configure the JAAS configuration property for each client in 
producer.properties or consumer.properties.
                 The login module describes how the clients like producer and 
consumer can connect to the Kafka Broker.
                 The following is an example configuration for a client for the 
PLAIN mechanism:
-                <pre>
+                <pre class="brush: text;">
     sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule 
required \
         username="alice" \
         password="alice-secret";</pre>
@@ -538,23 +537,23 @@
         before Kafka brokers are started. Client credentials may be created 
and updated dynamically and updated
         credentials will be used to authenticate new connections.</p>
         <p>Create SCRAM credentials for user <i>alice</i> with password 
<i>alice-secret</i>:
-        <pre>
-    bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 
'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]'
 --entity-type users --entity-name alice
+        <pre class="brush: bash;">
+    > bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 
'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]'
 --entity-type users --entity-name alice
         </pre>
         <p>The default iteration count of 4096 is used if iterations are not 
specified. A random salt is created
         and the SCRAM identity consisting of salt, iterations, StoredKey and 
ServerKey are stored in Zookeeper.
         See <a href="https://tools.ietf.org/html/rfc5802";>RFC 5802</a> for 
details on SCRAM identity and the individual fields.
         <p>The following examples also require a user <i>admin</i> for 
inter-broker communication which can be created using:
-        <pre>
-    bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 
'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' 
--entity-type users --entity-name admin
+        <pre class="brush: bash;">
+    > bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 
'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' 
--entity-type users --entity-name admin
         </pre>
         <p>Existing credentials may be listed using the <i>--describe</i> 
option:
-        <pre>
-   bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type 
users --entity-name alice
+        <pre class="brush: bash;">
+    > bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type 
users --entity-name alice
         </pre>
         <p>Credentials may be deleted for one or more SCRAM mechanisms using 
the <i>--delete</i> option:
-        <pre>
-   bin/kafka-configs.sh --zookeeper localhost:2181 --alter --delete-config 
'SCRAM-SHA-512' --entity-type users --entity-name alice
+        <pre class="brush: bash;">
+    > bin/kafka-configs.sh --zookeeper localhost:2181 --alter --delete-config 
'SCRAM-SHA-512' --entity-type users --entity-name alice
         </pre>
         </li>
         <li><h5><a id="security_sasl_scram_brokerconfig" 
href="#security_sasl_scram_brokerconfig">Configuring Kafka Brokers</a></h5>
@@ -586,7 +585,7 @@
            <li>Configure the JAAS configuration property for each client in 
producer.properties or consumer.properties.
                 The login module describes how the clients like producer and 
consumer can connect to the Kafka Broker.
                The following is an example configuration for a client for the 
SCRAM mechanisms:
-                <pre>
+                <pre class="brush: text;">
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule 
required \
         username="alice" \
         password="alice-secret";</pre>
@@ -599,7 +598,6 @@
                 <p>JAAS configuration for clients may alternatively be 
specified as a JVM parameter similar to brokers
                 as described <a href="#security_client_staticjaas">here</a>. 
Clients use the login section named
                 <tt>KafkaClient</tt>. This option allows only one user for all 
client connections from a JVM.</p></li>
-            </li>
             <li>Configure the following properties in producer.properties or 
consumer.properties:
                 <pre>
     security.protocol=SASL_SSL
@@ -791,25 +789,25 @@
     <ul>
         <li><b>Adding Acls</b><br>
     Suppose you want to add an acl "Principals User:Bob and User:Alice are 
allowed to perform Operation Read and Write on Topic Test-Topic from IP 
198.51.100.0 and IP 198.51.100.1". You can do that by executing the CLI with 
following options:
-            <pre>bin/kafka-acls.sh --authorizer-properties 
zookeeper.connect=localhost:2181 --add --allow-principal User:Bob 
--allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 
198.51.100.1 --operation Read --operation Write --topic Test-topic</pre>
+            <pre class="brush: bash;">bin/kafka-acls.sh 
--authorizer-properties zookeeper.connect=localhost:2181 --add 
--allow-principal User:Bob --allow-principal User:Alice --allow-host 
198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write 
--topic Test-topic</pre>
             By default, all principals that don't have an explicit acl that 
allows access for an operation to a resource are denied. In rare cases where an 
allow acl is defined that allows access to all but some principal we will have 
to use the --deny-principal and --deny-host option. For example, if we want to 
allow all users to Read from Test-topic but only deny User:BadBob from IP 
198.51.100.3 we can do so using following commands:
-            <pre>bin/kafka-acls.sh --authorizer-properties 
zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * 
--deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic 
Test-topic</pre>
+            <pre class="brush: bash;">bin/kafka-acls.sh 
--authorizer-properties zookeeper.connect=localhost:2181 --add 
--allow-principal User:* --allow-host * --deny-principal User:BadBob 
--deny-host 198.51.100.3 --operation Read --topic Test-topic</pre>
             Note that ``--allow-host`` and ``deny-host`` only support IP 
addresses (hostnames are not supported).
             Above examples add acls to a topic by specifying --topic 
[topic-name] as the resource option. Similarly user can add acls to cluster by 
specifying --cluster and to a consumer group by specifying --group 
[group-name].</li>
 
         <li><b>Removing Acls</b><br>
                 Removing acls is pretty much the same. The only difference is 
instead of --add option users will have to specify --remove option. To remove 
the acls added by the first example above we can execute the CLI with following 
options:
-            <pre> bin/kafka-acls.sh --authorizer-properties 
zookeeper.connect=localhost:2181 --remove --allow-principal User:Bob 
--allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 
198.51.100.1 --operation Read --operation Write --topic Test-topic </pre></li>
+            <pre class="brush: bash;"> bin/kafka-acls.sh 
--authorizer-properties zookeeper.connect=localhost:2181 --remove 
--allow-principal User:Bob --allow-principal User:Alice --allow-host 
198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write 
--topic Test-topic </pre></li>
 
         <li><b>List Acls</b><br>
                 We can list acls for any resource by specifying the --list 
option with the resource. To list all acls for Test-topic we can execute the 
CLI with following options:
-                <pre>bin/kafka-acls.sh --authorizer-properties 
zookeeper.connect=localhost:2181 --list --topic Test-topic</pre></li>
+                <pre class="brush: bash;">bin/kafka-acls.sh 
--authorizer-properties zookeeper.connect=localhost:2181 --list --topic 
Test-topic</pre></li>
 
         <li><b>Adding or removing a principal as producer or consumer</b><br>
                 The most common use case for acl management are 
adding/removing a principal as producer or consumer so we added convenience 
options to handle these cases. In order to add User:Bob as a producer of  
Test-topic we can execute the following command:
-            <pre> bin/kafka-acls.sh --authorizer-properties 
zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --producer 
--topic Test-topic</pre>
+            <pre class="brush: bash;"> bin/kafka-acls.sh 
--authorizer-properties zookeeper.connect=localhost:2181 --add 
--allow-principal User:Bob --producer --topic Test-topic</pre>
                 Similarly to add Alice as a consumer of Test-topic with 
consumer group Group-1 we just have to pass --consumer option:
-            <pre> bin/kafka-acls.sh --authorizer-properties 
zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --consumer 
--topic Test-topic --group Group-1 </pre>
+            <pre class="brush: bash;"> bin/kafka-acls.sh 
--authorizer-properties zookeeper.connect=localhost:2181 --add 
--allow-principal User:Bob --consumer --topic Test-topic --group Group-1 </pre>
                 Note that for consumer option we must also specify the 
consumer group.
                 In order to remove a principal from producer or consumer role 
we just need to pass --remove option. </li>
         </ul>
@@ -835,50 +833,58 @@
         <p></p>
         As an example, say we wish to encrypt both broker-client and 
broker-broker communication with SSL. In the first incremental bounce, a SSL 
port is opened on each node:
             <pre>
-            listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092</pre>
+            listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
+            </pre>
 
         We then restart the clients, changing their config to point at the 
newly opened, secured port:
 
             <pre>
             bootstrap.servers = [broker1:9092,...]
             security.protocol = SSL
-            ...etc</pre>
+            ...etc
+            </pre>
 
         In the second incremental server bounce we instruct Kafka to use SSL 
as the broker-broker protocol (which will use the same SSL port):
 
             <pre>
             listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
-            security.inter.broker.protocol=SSL</pre>
+            security.inter.broker.protocol=SSL
+            </pre>
 
         In the final bounce we secure the cluster by closing the PLAINTEXT 
port:
 
             <pre>
             listeners=SSL://broker1:9092
-            security.inter.broker.protocol=SSL</pre>
+            security.inter.broker.protocol=SSL
+            </pre>
 
         Alternatively we might choose to open multiple ports so that different 
protocols can be used for broker-broker and broker-client communication. Say we 
wished to use SSL encryption throughout (i.e. for broker-broker and 
broker-client communication) but we'd like to add SASL authentication to the 
broker-client connection also. We would achieve this by opening two additional 
ports during the first bounce:
 
             <pre>
-            
listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093</pre>
+            
listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
+            </pre>
 
         We would then restart the clients, changing their config to point at 
the newly opened, SASL & SSL secured port:
 
             <pre>
             bootstrap.servers = [broker1:9093,...]
             security.protocol = SASL_SSL
-            ...etc</pre>
+            ...etc
+            </pre>
 
         The second server bounce would switch the cluster to use encrypted 
broker-broker communication via the SSL port we previously opened on port 9092:
 
             <pre>
             
listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
-            security.inter.broker.protocol=SSL</pre>
+            security.inter.broker.protocol=SSL
+            </pre>
 
         The final bounce secures the cluster by closing the PLAINTEXT port.
 
             <pre>
         listeners=SSL://broker1:9092,SASL_SSL://broker1:9093
-        security.inter.broker.protocol=SSL</pre>
+        security.inter.broker.protocol=SSL
+            </pre>
 
         ZooKeeper can be secured independently of the Kafka cluster. The steps 
for doing this are covered in section <a href="#zk_authz_migration">7.6.2</a>.
 
@@ -907,11 +913,11 @@
         <li>Perform a second rolling restart of brokers, this time omitting 
the system property that sets the JAAS login file</li>
     </ol>
     Here is an example of how to run the migration tool:
-    <pre>
+    <pre class="brush: bash;">
     ./bin/zookeeper-security-migration.sh --zookeeper.acl=secure 
--zookeeper.connect=localhost:2181
     </pre>
     <p>Run this to see the full list of parameters:</p>
-    <pre>
+    <pre class="brush: bash;">
     ./bin/zookeeper-security-migration.sh --help
     </pre>
     <h4><a id="zk_authz_ensemble" href="#zk_authz_ensemble">7.6.3 Migrating 
the ZooKeeper ensemble</a></h4>

Reply via email to