prateekm commented on a change in pull request #1079: SAMZA-2250: Support large 
job models in standalone.
URL: https://github.com/apache/samza/pull/1079#discussion_r296834168
 
 

 ##########
 File path: samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java
 ##########
 @@ -64,25 +69,47 @@ public void init() {
    */
   @Override
   public byte[] get(String key) {
-    return zkClient.readData(getZkPathForKey(key), true);
+    byte[] aggregatedZNodeValues = new byte[0];
+    for (int segmentIndex = 0;; ++segmentIndex) {
+      String zkPath = getZkPath(key, segmentIndex);
+      byte[] zNodeValue = zkClient.readData(zkPath, true);
+      if (zNodeValue == null) {
+        break;
+      }
+      aggregatedZNodeValues = Bytes.concat(aggregatedZNodeValues, zNodeValue);
+    }
+    if (aggregatedZNodeValues.length > 0) {
+      byte[] value = ArrayUtils.subarray(aggregatedZNodeValues, 0, 
aggregatedZNodeValues.length - CHECKSUM_SIZE_IN_BYTES);
+      byte[] checkSum = ArrayUtils.subarray(aggregatedZNodeValues, 
aggregatedZNodeValues.length - CHECKSUM_SIZE_IN_BYTES, 
aggregatedZNodeValues.length);
+      byte[] expectedCheckSum = getCRCChecksum(value);
+      if (!Arrays.equals(checkSum, expectedCheckSum)) {
+        throw new IllegalStateException("Expected checksum of value did not 
match the actual checksum");
 
 Review comment:
   I meant log the checksum contents / message payload (or include it in the 
exception message), so we can tell what the corrupted and expected message is 
when debugging. This is a general purpose method with many call sites, so would 
be good to not rely on the caller to remember to log.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to