Repository: storm
Updated Branches:
  refs/heads/master 6a21b6a4e -> 9b2fd72e9


Correctly hash byte array tuple values


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b132520a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b132520a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b132520a

Branch: refs/heads/master
Commit: b132520adb00def6d03ef020b1cb5973eb3b3519
Parents: 98cbb34
Author: Derek Dagit <der...@yahoo-inc.com>
Authored: Fri Jul 17 18:45:39 2015 -0500
Committer: Derek Dagit <der...@yahoo-inc.com>
Committed: Fri Jul 17 18:45:39 2015 -0500

----------------------------------------------------------------------
 .../src/clj/backtype/storm/daemon/task.clj      |  1 -
 storm-core/src/clj/backtype/storm/tuple.clj     |  7 +-
 .../storm/grouping/PartialKeyGrouping.java      |  7 +-
 .../storm/testing/TestWordBytesCounter.java     | 27 ++++++++
 .../backtype/storm/testing/TestWordCounter.java |  6 +-
 .../backtype/storm/testing/WordBytesCount.java  | 35 ++++++++++
 .../test/clj/backtype/storm/grouping_test.clj   | 67 ++++++++++++--------
 7 files changed, 118 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b132520a/storm-core/src/clj/backtype/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/task.clj 
b/storm-core/src/clj/backtype/storm/daemon/task.clj
index 3bc1d05..9cf2b85 100644
--- a/storm-core/src/clj/backtype/storm/daemon/task.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/task.clj
@@ -27,7 +27,6 @@
   (:import [backtype.storm.spout ShellSpout])
   (:import [java.util Collection List ArrayList])
   (:require [backtype.storm
-             [tuple :as tuple]
              [thrift :as thrift]
              [stats :as stats]])
   (:require [backtype.storm.daemon.builtin-metrics :as builtin-metrics]))

http://git-wip-us.apache.org/repos/asf/storm/blob/b132520a/storm-core/src/clj/backtype/storm/tuple.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/tuple.clj 
b/storm-core/src/clj/backtype/storm/tuple.clj
index a88b73b..790a823 100644
--- a/storm-core/src/clj/backtype/storm/tuple.clj
+++ b/storm-core/src/clj/backtype/storm/tuple.clj
@@ -15,8 +15,11 @@
 ;; limitations under the License.
 
 (ns backtype.storm.tuple
-  (:import [java.util List]))
+  (:import [java.util Arrays List]))
 
 (defn list-hash-code
   [^List alist]
-  (.hashCode alist))
+  (if (nil? alist)
+    1
+    (let [^"[Ljava.lang.Object;" array (.toArray alist)] ;; Object[]
+      (Arrays/deepHashCode array))))

http://git-wip-us.apache.org/repos/asf/storm/blob/b132520a/storm-core/src/jvm/backtype/storm/grouping/PartialKeyGrouping.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/grouping/PartialKeyGrouping.java 
b/storm-core/src/jvm/backtype/storm/grouping/PartialKeyGrouping.java
index d1f534b..456a8ef 100644
--- a/storm-core/src/jvm/backtype/storm/grouping/PartialKeyGrouping.java
+++ b/storm-core/src/jvm/backtype/storm/grouping/PartialKeyGrouping.java
@@ -20,6 +20,7 @@ package backtype.storm.grouping;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import backtype.storm.generated.GlobalStreamId;
@@ -65,7 +66,11 @@ public class PartialKeyGrouping implements 
CustomStreamGrouping, Serializable {
                 List<Object> selectedFields = outFields.select(fields, values);
                 ByteBuffer out = ByteBuffer.allocate(selectedFields.size() * 
4);
                 for (Object o: selectedFields) {
-                    out.putInt(o.hashCode());
+                    if (o instanceof Object[]) {
+                        out.putInt(Arrays.deepHashCode((Object[])o));
+                    } else {
+                        out.putInt(o.hashCode());
+                    }
                 }
                 raw = out.array();
             } else {

http://git-wip-us.apache.org/repos/asf/storm/blob/b132520a/storm-core/src/jvm/backtype/storm/testing/TestWordBytesCounter.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/backtype/storm/testing/TestWordBytesCounter.java 
b/storm-core/src/jvm/backtype/storm/testing/TestWordBytesCounter.java
new file mode 100644
index 0000000..e8a09a7
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/testing/TestWordBytesCounter.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.testing;
+
+import backtype.storm.tuple.Tuple;
+
+public class TestWordBytesCounter extends TestWordCounter {
+    @Override
+    protected String getTupleValue(Tuple t, int idx) {
+      return new String(t.getBinary(idx));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b132520a/storm-core/src/jvm/backtype/storm/testing/TestWordCounter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/testing/TestWordCounter.java 
b/storm-core/src/jvm/backtype/storm/testing/TestWordCounter.java
index 551b054..08b1397 100644
--- a/storm-core/src/jvm/backtype/storm/testing/TestWordCounter.java
+++ b/storm-core/src/jvm/backtype/storm/testing/TestWordCounter.java
@@ -38,9 +38,13 @@ public class TestWordCounter extends BaseBasicBolt {
     public void prepare(Map stormConf, TopologyContext context) {
         _counts = new HashMap<String, Integer>();
     }
+
+    protected String getTupleValue(Tuple t, int idx) {
+      return (String) t.getValues().get(idx);
+    }
     
     public void execute(Tuple input, BasicOutputCollector collector) {
-        String word = (String) input.getValues().get(0);
+        String word = getTupleValue(input, 0);
         int count = 0;
         if(_counts.containsKey(word)) {
             count = _counts.get(word);

http://git-wip-us.apache.org/repos/asf/storm/blob/b132520a/storm-core/src/jvm/backtype/storm/testing/WordBytesCount.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/testing/WordBytesCount.java 
b/storm-core/src/jvm/backtype/storm/testing/WordBytesCount.java
new file mode 100644
index 0000000..31fdca6
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/testing/WordBytesCount.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.testing;
+
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseBasicBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class WordBytesCount extends WordCount {
+  @Override
+  protected String getWordFromTuple(Tuple tuple, int idx) {
+    return new String(tuple.getBinary(idx));
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b132520a/storm-core/test/clj/backtype/storm/grouping_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/grouping_test.clj 
b/storm-core/test/clj/backtype/storm/grouping_test.clj
index fc13b0c..9402842 100644
--- a/storm-core/test/clj/backtype/storm/grouping_test.clj
+++ b/storm-core/test/clj/backtype/storm/grouping_test.clj
@@ -15,40 +15,53 @@
 ;; limitations under the License.
 (ns backtype.storm.grouping-test
   (:use [clojure test])
-  (:import [backtype.storm.testing TestWordCounter TestWordSpout 
TestGlobalCount TestAggregatesCounter NGrouping]
+  (:import [backtype.storm.testing TestWordCounter TestWordSpout 
TestGlobalCount TestAggregatesCounter TestWordBytesCounter NGrouping]
            [backtype.storm.generated JavaObject JavaObjectArg])
   (:use [backtype.storm testing clojure])
   (:use [backtype.storm.daemon common])
   (:require [backtype.storm [thrift :as thrift]]))
 
-(deftest test-shuffle
+ (deftest test-shuffle
+   (with-simulated-time-local-cluster [cluster :supervisors 4]
+     (let [topology (thrift/mk-topology
+                     {"1" (thrift/mk-spout-spec (TestWordSpout. true) 
:parallelism-hint 4)}
+                     {"2" (thrift/mk-bolt-spec {"1" :shuffle} 
(TestGlobalCount.)
+                                             :parallelism-hint 6)
+                      })
+           results (complete-topology cluster
+                                      topology
+                                      ;; important for test that
+                                      ;; #tuples = multiple of 4 and 6
+                                      :mock-sources {"1" (->> [["a"] ["b"]]
+                                                              (repeat 12)
+                                                              (apply 
concat))})]
+       (is (ms= (apply concat (repeat 6 [[1] [2] [3] [4]]))
+                (read-tuples results "2")))
+       )))
+
+(deftest test-field
   (with-simulated-time-local-cluster [cluster :supervisors 4]
-    (let [topology (thrift/mk-topology
-                    {"1" (thrift/mk-spout-spec (TestWordSpout. true) 
:parallelism-hint 4)}
-                    {"2" (thrift/mk-bolt-spec {"1" :shuffle} (TestGlobalCount.)
-                                            :parallelism-hint 6)
+    (let [spout-phint 4
+          bolt-phint 6
+          topology (thrift/mk-topology
+                    {"1" (thrift/mk-spout-spec (TestWordSpout. true)
+                                               :parallelism-hint spout-phint)}
+                    {"2" (thrift/mk-bolt-spec {"1" ["word"]}
+                                              (TestWordBytesCounter.)
+                                              :parallelism-hint bolt-phint)
                      })
-          results (complete-topology cluster
-                                     topology
-                                     ;; important for test that
-                                     ;; #tuples = multiple of 4 and 6
-                                     :mock-sources {"1" [["a"] ["b"]
-                                                         ["a"] ["b"]
-                                                         ["a"] ["b"]
-                                                         ["a"] ["b"]
-                                                         ["a"] ["b"]
-                                                         ["a"] ["b"]
-                                                         ["a"] ["b"]
-                                                         ["a"] ["b"]
-                                                         ["a"] ["b"]
-                                                         ["a"] ["b"]
-                                                         ["a"] ["b"]
-                                                         ["a"] ["b"]
-                                                       ]}
-                                     )]
-      (is (ms= (apply concat (repeat 6 [[1] [2] [3] [4]]))
-               (read-tuples results "2")))
-      )))
+          results (complete-topology
+                    cluster
+                    topology
+                    :mock-sources {"1" (->> [[(.getBytes "a")]
+                                             [(.getBytes "b")]]
+                                            (repeat (* spout-phint bolt-phint))
+                                            (apply concat))})]
+      (is (ms= (apply concat
+                      (for [value '("a" "b")
+                            sum (range 1 (inc (* spout-phint bolt-phint)))]
+                        [[value sum]]))
+               (read-tuples results "2"))))))
 
 (defbolt id-bolt ["val"] [tuple collector]
   (emit-bolt! collector (.getValues tuple))

Reply via email to