Author: edwardyoon
Date: Wed Aug 20 07:36:29 2014
New Revision: 1619042
URL: http://svn.apache.org/r1619042
Log:
HAMA-914: Boolean flag (isCompressed) is required only when runtime compression
is enabled.
Modified:
hama/trunk/CHANGES.txt
hama/trunk/conf/hama-default.xml
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1619042&r1=1619041&r2=1619042&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Wed Aug 20 07:36:29 2014
@@ -17,6 +17,7 @@ Release 0.7.0 (unreleased changes)
IMPROVEMENTS
+ HAMA-914: Boolean flag (isCompressed) is required only when runtime
compression is enabled (edwardyoon)
HAMA-910: Web UI Improvement (Victor Lee via edwardyoon)
HAMA-909: Improve Mesos Scheduler's Fault Tolerance (Jeff Fenchel via
edwardyoon)
HAMA-823: Remove javadoc warnings (Victor Lee via edwardyoon)
Modified: hama/trunk/conf/hama-default.xml
URL:
http://svn.apache.org/viewvc/hama/trunk/conf/hama-default.xml?rev=1619042&r1=1619041&r2=1619042&view=diff
==============================================================================
--- hama/trunk/conf/hama-default.xml (original)
+++ hama/trunk/conf/hama-default.xml Wed Aug 20 07:36:29 2014
@@ -284,6 +284,11 @@
</property>
<property>
+ <name>hama.messenger.runtime.compression</name>
+ <value>false</value>
+ <description>True if you want to enable runtime compression</description>
+ </property>
+ <property>
<name>hama.messenger.compression.class</name>
<value>org.apache.hama.bsp.message.compress.SnappyCompressor</value>
<description>The message compression algorithm to choose. Default is
null.</description>
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java?rev=1619042&r1=1619041&r2=1619042&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java Wed
Aug 20 07:36:29 2014
@@ -87,17 +87,20 @@ public class BSPMessageBundle<M extends
try {
serialized = serialize(message);
- if (compressor != null && serialized.length > threshold) {
- bufferDos.writeBoolean(true);
- compressed = compressor.compress(serialized);
- bufferDos.writeInt(compressed.length);
- bufferDos.write(compressed);
-
- bundleLength += compressed.length;
+ if (compressor != null) {
+ if (serialized.length > threshold) {
+ bufferDos.writeBoolean(true);
+ compressed = compressor.compress(serialized);
+ bufferDos.writeInt(compressed.length);
+ bufferDos.write(compressed);
+ bundleLength += compressed.length;
+ } else {
+ bufferDos.writeBoolean(false);
+ bufferDos.write(serialized);
+ bundleLength += serialized.length;
+ }
} else {
- bufferDos.writeBoolean(false);
bufferDos.write(serialized);
-
bundleLength += serialized.length;
}
} catch (IOException e) {
@@ -114,7 +117,7 @@ public class BSPMessageBundle<M extends
public byte[] getBuffer() {
return byteBuffer.toByteArray();
}
-
+
public Iterator<M> iterator() {
bis = new ByteArrayInputStream(byteBuffer.toByteArray());
dis = new DataInputStream(bis);
@@ -140,10 +143,13 @@ public class BSPMessageBundle<M extends
@Override
public M next() {
boolean isCompressed = false;
- try {
- isCompressed = dis.readBoolean();
- } catch (IOException e1) {
- e1.printStackTrace();
+
+ if (compressor != null) {
+ try {
+ isCompressed = dis.readBoolean();
+ } catch (IOException e1) {
+ e1.printStackTrace();
+ }
}
Class<M> clazz = null;
@@ -152,10 +158,12 @@ public class BSPMessageBundle<M extends
} catch (ClassNotFoundException e) {
LOG.error("Class was not found.", e);
}
+
msg = ReflectionUtils.newInstance(clazz, null);
try {
if (isCompressed) {
+ // LOG.debug(">>>>> decompressing .........");
int length = dis.readInt();
compressed = new byte[length];
dis.readFully(compressed);
@@ -215,7 +223,7 @@ public class BSPMessageBundle<M extends
@Override
public void readFields(DataInput in) throws IOException {
this.bundleSize = in.readInt();
-
+
if (this.bundleSize > 0) {
className = in.readUTF();
int bytesLength = in.readInt();
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1619042&r1=1619041&r2=1619042&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Wed
Aug 20 07:36:29 2014
@@ -353,8 +353,11 @@ public class LocalBSPRunner implements J
throws IOException {
peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
bundle.getLength());
- bundle.setCompressor(compressor,
- conf.getLong("hama.messenger.compression.threshold", 512));
+
+ if (conf.getBoolean("hama.messenger.runtime.compression", false)) {
+ bundle.setCompressor(compressor,
+ conf.getLong("hama.messenger.compression.threshold", 512));
+ }
Iterator<M> it = bundle.iterator();
while (it.hasNext()) {
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1619042&r1=1619041&r2=1619042&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
Wed Aug 20 07:36:29 2014
@@ -279,8 +279,10 @@ public abstract class AbstractMessageMan
@Override
public void loopBackBundle(BSPMessageBundle<M> bundle) throws IOException {
- bundle.setCompressor(compressor,
- conf.getLong("hama.messenger.compression.threshold", 128));
+ if (conf.getBoolean("hama.messenger.runtime.compression", false)) {
+ bundle.setCompressor(compressor,
+ conf.getLong("hama.messenger.compression.threshold", 128));
+ }
Iterator<? extends Writable> it = bundle.iterator();
while (it.hasNext()) {
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java?rev=1619042&r1=1619041&r2=1619042&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
Wed Aug 20 07:36:29 2014
@@ -86,8 +86,10 @@ public class OutgoingPOJOMessageBundle<M
if (!outgoingBundles.containsKey(targetPeerAddress)) {
BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
- bundle.setCompressor(compressor,
- conf.getLong("hama.messenger.compression.threshold", 128));
+ if (conf.getBoolean("hama.messenger.runtime.compression", false)) {
+ bundle.setCompressor(compressor,
+ conf.getLong("hama.messenger.compression.threshold", 128));
+ }
outgoingBundles.put(targetPeerAddress, bundle);
}
return targetPeerAddress;
Modified:
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1619042&r1=1619041&r2=1619042&view=diff
==============================================================================
---
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
(original)
+++
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
Wed Aug 20 07:36:29 2014
@@ -50,7 +50,8 @@ public class TestBSPMasterGroomServer ex
public TestBSPMasterGroomServer() {
configuration = new HamaConfiguration();
configuration.set("bsp.master.address", "localhost");
- configuration.set("hama.child.redirect.log.console", "true");
+ configuration.setBoolean("hama.child.redirect.log.console", true);
+ configuration.setBoolean("hama.messenger.runtime.compression", true);
assertEquals("Make sure master addr is set to localhost:", "localhost",
configuration.get("bsp.master.address"));
configuration.set("bsp.local.dir", "/tmp/hama-test");
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java?rev=1619042&r1=1619041&r2=1619042&view=diff
==============================================================================
---
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
(original)
+++
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
Wed Aug 20 07:36:29 2014
@@ -119,7 +119,10 @@ public class OutgoingVertexMessagesManag
if (!outgoingBundles.containsKey(targetPeerAddress)) {
BSPMessageBundle<GraphJobMessage> bundle = new
BSPMessageBundle<GraphJobMessage>();
- bundle.setCompressor(compressor,
conf.getLong("hama.messenger.compression.threshold", 128));
+ if (conf.getBoolean("hama.messenger.runtime.compression", false)) {
+ bundle.setCompressor(compressor,
+ conf.getLong("hama.messenger.compression.threshold", 128));
+ }
outgoingBundles.put(targetPeerAddress, bundle);
}
return targetPeerAddress;