STORM-697: Review feedback: Fixed missing or misplaced licenses. Added a more 
verbose explaination of MessageMetadataSchemeAsMultiScheme in the README


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c0c830c1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c0c830c1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c0c830c1

Branch: refs/heads/master
Commit: c0c830c1e41059a39a61b6563d77ecab5f333186
Parents: 25e7bc4
Author: matt.tieman <matt.tie...@inin.com>
Authored: Wed Oct 28 09:10:53 2015 -0400
Committer: matt.tieman <matt.tie...@inin.com>
Committed: Wed Oct 28 09:10:53 2015 -0400

----------------------------------------------------------------------
 external/storm-kafka/README.md                  | 44 ++++++++++++--------
 .../jvm/storm/kafka/MessageMetadataScheme.java  | 10 ++---
 .../MessageMetadataSchemeAsMultiScheme.java     | 17 ++++++++
 .../kafka/StringMessageAndMetadataScheme.java   | 17 ++++++++
 4 files changed, 65 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c0c830c1/external/storm-kafka/README.md
----------------------------------------------------------------------
diff --git a/external/storm-kafka/README.md b/external/storm-kafka/README.md
index 1bf14b7..04fb96c 100644
--- a/external/storm-kafka/README.md
+++ b/external/storm-kafka/README.md
@@ -6,16 +6,16 @@ Provides core Storm and Trident spout implementations for 
consuming data from Ap
 ##Spouts
 We support both Trident and core Storm spouts. For both spout implementations, 
we use a BrokerHost interface that
 tracks Kafka broker host to partition mapping and kafkaConfig that controls 
some Kafka related parameters.
- 
+
 ###BrokerHosts
-In order to initialize your Kafka spout/emitter you need to construct an 
instance of the marker interface BrokerHosts. 
+In order to initialize your Kafka spout/emitter you need to construct an 
instance of the marker interface BrokerHosts.
 Currently, we support the following two implementations:
 
 ####ZkHosts
-ZkHosts is what you should use if you want to dynamically track Kafka broker 
to partition mapping. This class uses 
+ZkHosts is what you should use if you want to dynamically track Kafka broker 
to partition mapping. This class uses
 Kafka's ZooKeeper entries to track brokerHost -> partition mapping. You can 
instantiate an object by calling
 ```java
-    public ZkHosts(String brokerZkStr, String brokerZkPath) 
+    public ZkHosts(String brokerZkStr, String brokerZkPath)
     public ZkHosts(String brokerZkStr)
 ```
 Where brokerZkStr is just ip:port (e.g. localhost:2181). brokerZkPath is the 
root directory under which all the topics and
@@ -40,7 +40,7 @@ of this class, you need to first construct an instance of 
GlobalPartitionInforma
 ```
 
 ###KafkaConfig
-The second thing needed for constructing a kafkaSpout is an instance of 
KafkaConfig. 
+The second thing needed for constructing a kafkaSpout is an instance of 
KafkaConfig.
 ```java
     public KafkaConfig(BrokerHosts hosts, String topic)
     public KafkaConfig(BrokerHosts hosts, String topic, String clientId)
@@ -103,9 +103,17 @@ also controls the naming of your output field.
   public Fields getOutputFields();
 ```
 
-The default `RawMultiScheme` just takes the `byte[]` and returns a tuple with 
`byte[]` as is. The name of the
-outputField is "bytes".  There are alternative implementations like 
`SchemeAsMultiScheme`,
-`KeyValueSchemeAsMultiScheme`, and `MessageMetadataSchemeAsMultiScheme` which 
can convert the `byte[]` to `String`.
+The default `RawMultiScheme` just takes the `byte[]` and returns a tuple with 
`byte[]` as is. The name of the outputField is "bytes". There are alternative 
implementations like `SchemeAsMultiScheme` and `KeyValueSchemeAsMultiScheme` 
which can convert the `byte[]` to `String`.
+
+There is also an extension of `SchemeAsMultiScheme`, 
`MessageMetadataSchemeAsMultiScheme`,
+which has an additional deserialize method that accepts the message `byte[]` 
in addition to the `Partition` and `offset` associated with the message.
+
+```java
+public Iterable<List<Object>> deserializeMessageWithMetadata(byte[] message, 
Partition partition, long offset)
+
+```
+
+This is useful for auditing/replaying messages from arbitrary points on a 
Kafka topic, saving the partition and offset of each message of a discrete 
stream instead of persisting the entire message.
 
 
 ### Examples
@@ -184,7 +192,7 @@ use Kafka 0.8.1.1 built against Scala 2.10, you would use 
the following dependen
 Note that the ZooKeeper and log4j dependencies are excluded to prevent version 
conflicts with Storm's dependencies.
 
 ##Writing to Kafka as part of your topology
-You can create an instance of storm.kafka.bolt.KafkaBolt and attach it as a 
component to your topology or if you 
+You can create an instance of storm.kafka.bolt.KafkaBolt and attach it as a 
component to your topology or if you
 are using trident you can use storm.kafka.trident.TridentState, 
storm.kafka.trident.TridentStateFactory and
 storm.kafka.trident.TridentKafkaUpdater.
 
@@ -199,9 +207,9 @@ These interfaces have 2 methods defined:
 ```
 
 As the name suggests, these methods are called to map a tuple to Kafka key and 
Kafka message. If you just want one field
-as key and one field as value, then you can use the provided 
FieldNameBasedTupleToKafkaMapper.java 
-implementation. In the KafkaBolt, the implementation always looks for a field 
with field name "key" and "message" if you 
-use the default constructor to construct FieldNameBasedTupleToKafkaMapper for 
backward compatibility 
+as key and one field as value, then you can use the provided 
FieldNameBasedTupleToKafkaMapper.java
+implementation. In the KafkaBolt, the implementation always looks for a field 
with field name "key" and "message" if you
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper for 
backward compatibility
 reasons. Alternatively you could also specify a different key and message 
field by using the non default constructor.
 In the TridentKafkaState you must specify what is the field name for key and 
message as there is no default constructor.
 These should be specified while constructing and instance of 
FieldNameBasedTupleToKafkaMapper.
@@ -213,12 +221,12 @@ public interface KafkaTopicSelector {
     String getTopics(Tuple/TridentTuple tuple);
 }
 ```
-The implementation of this interface should return the topic to which the 
tuple's key/message mapping needs to be published 
-You can return a null and the message will be ignored. If you have one static 
topic name then you can use 
+The implementation of this interface should return the topic to which the 
tuple's key/message mapping needs to be published
+You can return a null and the message will be ignored. If you have one static 
topic name then you can use
 DefaultTopicSelector.java and set the name of the topic in the constructor.
 
 ### Specifying Kafka producer properties
-You can provide all the produce properties , see 
http://kafka.apache.org/documentation.html#newproducerconfigs 
+You can provide all the produce properties , see 
http://kafka.apache.org/documentation.html#newproducerconfigs
 section "Important configuration properties for the producer", in your Storm 
topology config by setting the properties
 map with key kafka.broker.properties.
 
@@ -227,7 +235,7 @@ map with key kafka.broker.properties.
 For the bolt :
 ```java
         TopologyBuilder builder = new TopologyBuilder();
-    
+
         Fields fields = new Fields("key", "message");
         FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
                     new Values("storm", "1"),
@@ -241,7 +249,7 @@ For the bolt :
                 .withTopicSelector(new DefaultTopicSelector("test"))
                 .withTupleToKafkaMapper(new 
FieldNameBasedTupleToKafkaMapper());
         builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");
-        
+
         Config conf = new Config();
         //set producer properties.
         Properties props = new Properties();
@@ -250,7 +258,7 @@ For the bolt :
         props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
         conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
-        
+
         StormSubmitter.submitTopology("kafkaboltTest", conf, 
builder.createTopology());
 ```
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c0c830c1/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java 
b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java
index da7acbf..92a5598 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java
@@ -1,8 +1,3 @@
-package storm.kafka;
-
-import java.util.List;
-import backtype.storm.spout.Scheme;
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -20,6 +15,11 @@ import backtype.storm.spout.Scheme;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package storm.kafka;
+
+import java.util.List;
+import backtype.storm.spout.Scheme;
+
 public interface MessageMetadataScheme extends Scheme {
     public List<Object> deserializeMessageWithMetadata(byte[] message, 
Partition partition, long offset);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c0c830c1/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
 
b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
index e89e391..0567809 100644
--- 
a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
+++ 
b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package storm.kafka;
 
 import java.util.Arrays;

http://git-wip-us.apache.org/repos/asf/storm/blob/c0c830c1/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java 
b/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java
index 2dc4c02..031d497 100644
--- 
a/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java
+++ 
b/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package storm.kafka;
 
 import java.util.List;

Reply via email to