Updated Branches: refs/heads/trunk 738fcd2a1 -> 7d0e00699
GIRAPH-736: Bring back FindBugs (nitay) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/7d0e0069 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/7d0e0069 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/7d0e0069 Branch: refs/heads/trunk Commit: 7d0e006992574973bbd732373af32462393f00b5 Parents: 738fcd2 Author: Nitay Joffe <[email protected]> Authored: Thu Aug 15 23:21:50 2013 -0400 Committer: Nitay Joffe <[email protected]> Committed: Thu Aug 15 23:22:42 2013 -0400 ---------------------------------------------------------------------- CHANGELOG | 2 + findbugs-exclude.xml | 62 +++++++++++++------- .../java/org/apache/giraph/bsp/BspService.java | 5 +- .../out_of_core/SequentialFileMessageStore.java | 17 ++++-- .../giraph/comm/netty/SaslNettyServer.java | 7 ++- .../netty/handler/RequestServerHandler.java | 9 ++- .../netty/handler/ResponseClientHandler.java | 9 ++- .../comm/netty/handler/SaslServerHandler.java | 9 ++- .../java/org/apache/giraph/conf/AllOptions.java | 8 +-- .../jython/wrappers/JythonWrapperBase.java | 3 + .../jython/wrappers/JythonWritableWrapper.java | 19 ++++++ .../apache/giraph/master/BspServiceMaster.java | 14 +++-- .../partition/DiskBackedPartitionStore.java | 30 +++++++--- .../giraph/partition/PartitionBalancer.java | 13 ++++ .../apache/giraph/partition/PartitionUtils.java | 11 +++- .../org/apache/giraph/utils/EdgeComparator.java | 7 ++- .../java/org/apache/giraph/utils/FileUtils.java | 15 ++++- .../main/java/org/apache/giraph/utils/JMap.java | 3 +- .../apache/giraph/worker/BspServiceWorker.java | 3 +- .../org/apache/giraph/zk/ZooKeeperManager.java | 6 +- .../examples/RandomWalkWorkerContext.java | 13 +++- .../giraph/examples/SimpleAggregatorWriter.java | 11 +++- .../giraph/examples/SimpleCheckpoint.java | 13 +++- .../giraph/examples/SimpleFailComputation.java | 11 +++- .../SimpleMasterComputeComputation.java | 2 +- .../SimpleTriangleClosingComputation.java | 18 ++++++ .../giraph/rexster/utils/RexsterUtils.java | 9 +-- 27 files changed, 259 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/7d0e0069/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index c9b8a15..b8dd886 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-736: Bring back FindBugs (nitay) + GIRAPH-735: DiskBackedPartitionStore throws NPE due to uninitialized OutEdges (claudio) http://git-wip-us.apache.org/repos/asf/giraph/blob/7d0e0069/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/findbugs-exclude.xml b/findbugs-exclude.xml index 6cc5f7a..21aa4ef 100644 --- a/findbugs-exclude.xml +++ b/findbugs-exclude.xml @@ -19,61 +19,81 @@ <FindBugsFilter> <Match> + <Class name="org.apache.giraph.hive.column.HiveColumnWriter"/> + <Bug pattern="BC_UNCONFIRMED_CAST"/> + </Match> + <Match> + <Class name="org.apache.giraph.hive.record.HiveRecordWriter"/> + <Bug pattern="BC_UNCONFIRMED_CAST"/> + </Match> + <Match> + <Class name="org.apache.giraph.conf.AllOptions"/> <Bug pattern="DM_DEFAULT_ENCODING"/> </Match> <Match> - <Bug pattern="DM_EXIT"/> + <Class name="org.apache.giraph.zk.ZooKeeperManager"/> + <Bug pattern="DM_DEFAULT_ENCODING"/> </Match> <Match> - <Bug pattern="EI_EXPOSE_REP"/> + <Class name="org.apache.giraph.examples.SimpleFailComputation"/> + <Bug pattern="DM_EXIT"/> </Match> <Match> - <Bug pattern="EI_EXPOSE_REP2"/> + <Class name="org.apache.giraph.graph.GraphMapper$OverrideExceptionHandler"/> + <Bug pattern="DM_EXIT"/> </Match> <Match> - <Bug pattern="EQ_COMPARETO_USE_OBJECT_EQUALS"/> + <Class name="org.apache.giraph.worker.BspServiceWorker"/> + <Bug pattern="DM_EXIT"/> </Match> <Match> - <Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER"/> + <Class name="org.apache.giraph.examples.SimpleCheckpoint$SimpleCheckpointComputation"/> + <Bug pattern="DM_EXIT"/> </Match> <Match> - <Bug pattern="NS_DANGEROUS_NON_SHORT_CIRCUIT"/> + <Bug pattern="EI_EXPOSE_REP"/> </Match> <Match> - <Bug pattern="RV_ABSOLUTE_VALUE_OF_HASHCODE"/> + <Bug pattern="EI_EXPOSE_REP2"/> </Match> <Match> - <Bug pattern="RV_ABSOLUTE_VALUE_OF_RANDOM_INT"/> + <Bug pattern="EQ_OVERRIDING_EQUALS_NOT_SYMMETRIC"/> + <Class name="org.apache.giraph.jython.wrappers.JythonWrapperBase"/> </Match> <Match> - <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/> + <Bug pattern="EQ_OVERRIDING_EQUALS_NOT_SYMMETRIC"/> + <Class name="org.apache.giraph.jython.wrappers.JythonWritableWrapper"/> </Match> <Match> - <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE"/> + <Class name="org.apache.giraph.comm.netty.NettyClient"/> + <Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER"/> </Match> <Match> - <Bug pattern="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD"/> + <Class name="org.apache.giraph.comm.netty.handler.RequestServerHandler"/> + <Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER"/> </Match> <Match> - <Bug pattern="UUF_UNUSED_FIELD"/> + <Class name="org.apache.giraph.comm.netty.handler.ResponseClientHandler"/> + <Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER"/> </Match> <Match> - <Bug pattern="UWF_UNWRITTEN_PUBLIC_OR_PROTECTED_FIELD"/> + <Class name="org.apache.giraph.io.formats.PseudoRandomLocalEdgesHelper"/> + <Bug pattern="RV_ABSOLUTE_VALUE_OF_RANDOM_INT"/> </Match> <Match> - <Class name="org.apache.giraph.jython.wrappers.JythonWrapperBase"/> - <Bug pattern="EQ_OVERRIDING_EQUALS_NOT_SYMMETRIC"/> + <Class name="org.apache.giraph.master.BspServiceMaster"/> + <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/> </Match> <Match> - <Class name="org.apache.giraph.jython.JythonWritableWrapper"/> - <Bug pattern="EQ_OVERRIDING_EQUALS_NOT_SYMMETRIC"/> + <Class name="org.apache.giraph.partition.PartitionBalancer$PartitionOwnerComparator"/> + <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE"/> </Match> <Match> - <Class name="org.apache.giraph.hive.column.HiveColumnWriter"/> - <Bug pattern="BC_UNCONFIRMED_CAST"/> + <Class name="org.apache.giraph.partition.DiskBackedPartitionStore$AddPartition"/> + <Bug pattern="UL_UNRELEASED_LOCK"/> </Match> <Match> - <Class name="org.apache.giraph.hive.record.HiveRecordWriter"/> - <Bug pattern="BC_UNCONFIRMED_CAST"/> + <Class name="org.apache.giraph.partition.DiskBackedPartitionStore$GetPartition"/> + <Bug pattern="UL_UNRELEASED_LOCK"/> </Match> </FindBugsFilter> http://git-wip-us.apache.org/repos/asf/giraph/blob/7d0e0069/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java index 42e8e7e..aae01da 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java @@ -47,6 +47,7 @@ import org.json.JSONObject; import java.io.IOException; import java.net.UnknownHostException; +import java.nio.charset.Charset; import java.security.InvalidParameterException; import java.util.ArrayList; import java.util.Collections; @@ -690,8 +691,8 @@ public abstract class BspService<I extends WritableComparable, return null; } jobState = - new String(getZkExt().getData( - childList.get(childList.size() - 1), true, null)); + new String(getZkExt().getData(childList.get(childList.size() - 1), + true, null), Charset.defaultCharset()); } catch (KeeperException.NoNodeException e) { LOG.info("getJobState: Job state path is empty! - " + masterJobStatePath); http://git-wip-us.apache.org/repos/asf/giraph/blob/7d0e0069/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java index 89520ff..64031c3 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java @@ -116,9 +116,13 @@ public class SequentialFileMessageStore<I extends WritableComparable, if (LOG.isDebugEnabled()) { LOG.debug("addMessages: Deleting " + file); } - file.delete(); + if (!file.delete()) { + throw new IOException("Failed to delete existing file " + file); + } + } + if (!file.createNewFile()) { + throw new IOException("Failed to create file " + file); } - file.createNewFile(); if (LOG.isDebugEnabled()) { LOG.debug("addMessages: Creating " + file); } @@ -196,7 +200,9 @@ public class SequentialFileMessageStore<I extends WritableComparable, */ public void clearAll() throws IOException { endReading(); - file.delete(); + if (!file.delete()) { + LOG.error("clearAll: Failed to delete file " + file); + } } @Override @@ -390,7 +396,10 @@ public class SequentialFileMessageStore<I extends WritableComparable, String directory = path + File.separator + jobId + File.separator + taskId + File.separator; directories[i++] = directory; - new File(directory).mkdirs(); + if (!new File(directory).mkdirs()) { + LOG.error("SequentialFileMessageStore$Factory: Failed to create " + + directory); + } } this.bufferSize = GiraphConstants.MESSAGES_BUFFER_SIZE.get(config); storeCounter = new AtomicInteger(); http://git-wip-us.apache.org/repos/asf/giraph/blob/7d0e0069/giraph-core/src/main/java/org/apache/giraph/comm/netty/SaslNettyServer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/SaslNettyServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/SaslNettyServer.java index 2cbf482..00a802f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/SaslNettyServer.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/SaslNettyServer.java @@ -39,6 +39,7 @@ import javax.security.sasl.Sasl; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; import java.io.IOException; +import java.nio.charset.Charset; /** * Encapsulates SASL server logic for Giraph BSP worker servers. @@ -120,7 +121,8 @@ else[HADOOP_1_SECRET_MANAGER]*/ * @return Base64-encoded string */ static String encodeIdentifier(byte[] identifier) { - return new String(Base64.encodeBase64(identifier)); + return new String(Base64.encodeBase64(identifier), + Charset.defaultCharset()); } /** @@ -129,7 +131,8 @@ else[HADOOP_1_SECRET_MANAGER]*/ * @return password as a char array. */ static char[] encodePassword(byte[] password) { - return new String(Base64.encodeBase64(password)).toCharArray(); + return new String(Base64.encodeBase64(password), + Charset.defaultCharset()).toCharArray(); } /** CallbackHandler for SASL DIGEST-MD5 mechanism */ http://git-wip-us.apache.org/repos/asf/giraph/blob/7d0e0069/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java index a02039e..781e37c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java @@ -90,7 +90,7 @@ public abstract class RequestServerHandler<R> extends LOG.info("messageReceived: Simulating closing channel on first " + "request " + writableRequest.getRequestId() + " from " + writableRequest.getClientId()); - ALREADY_CLOSED_FIRST_REQUEST = true; + setAlreadyClosedFirstRequest(); ctx.getChannel().close(); return; } @@ -129,6 +129,13 @@ public abstract class RequestServerHandler<R> extends } /** + * Set the flag indicating already closed first request + */ + private static void setAlreadyClosedFirstRequest() { + ALREADY_CLOSED_FIRST_REQUEST = true; + } + + /** * Process request * * @param request Request to process http://git-wip-us.apache.org/repos/asf/giraph/blob/7d0e0069/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java index 9f3f034..1ba06e9 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java @@ -90,7 +90,7 @@ public class ResponseClientHandler extends SimpleChannelUpstreamHandler { if (dropFirstResponse && !ALREADY_DROPPED_FIRST_RESPONSE) { LOG.info("messageReceived: Simulating dropped response " + response + " for request " + requestId); - ALREADY_DROPPED_FIRST_RESPONSE = true; + setAlreadyDroppedFirstResponse(); synchronized (workerIdOutstandingRequestMap) { workerIdOutstandingRequestMap.notifyAll(); } @@ -123,6 +123,13 @@ public class ResponseClientHandler extends SimpleChannelUpstreamHandler { } } + /** + * Set already dropped first response flag + */ + private static void setAlreadyDroppedFirstResponse() { + ALREADY_DROPPED_FIRST_RESPONSE = true; + } + @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { http://git-wip-us.apache.org/repos/asf/giraph/blob/7d0e0069/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java index 9644a5f..922f373 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java @@ -92,7 +92,7 @@ public class SaslServerHandler extends LOG.info("messageReceived: Simulating closing channel on first " + "request " + writableRequest.getRequestId() + " from " + writableRequest.getClientId()); - ALREADY_CLOSED_FIRST_REQUEST = true; + setAlreadyClosedFirstRequest(); ctx.getChannel().close(); return; } @@ -153,6 +153,13 @@ public class SaslServerHandler extends } /** + * Set already closed first request flag + */ + private static void setAlreadyClosedFirstRequest() { + ALREADY_CLOSED_FIRST_REQUEST = true; + } + + /** * Load Hadoop Job Token into secret manager. * * @param conf Configuration http://git-wip-us.apache.org/repos/asf/giraph/blob/7d0e0069/giraph-core/src/main/java/org/apache/giraph/conf/AllOptions.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/AllOptions.java b/giraph-core/src/main/java/org/apache/giraph/conf/AllOptions.java index 1f9fb78..ea9a370 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/AllOptions.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/AllOptions.java @@ -17,7 +17,9 @@ */ package org.apache.giraph.conf; -import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_CLASS; +import org.apache.log4j.Logger; + +import com.google.common.collect.Lists; import java.io.BufferedWriter; import java.io.FileWriter; @@ -25,9 +27,7 @@ import java.io.IOException; import java.util.Collections; import java.util.List; -import org.apache.log4j.Logger; - -import com.google.common.collect.Lists; +import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_CLASS; /** * Tracks all of the Giraph options http://git-wip-us.apache.org/repos/asf/giraph/blob/7d0e0069/giraph-core/src/main/java/org/apache/giraph/jython/wrappers/JythonWrapperBase.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/jython/wrappers/JythonWrapperBase.java b/giraph-core/src/main/java/org/apache/giraph/jython/wrappers/JythonWrapperBase.java index 4ec450a..56fd13c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/jython/wrappers/JythonWrapperBase.java +++ b/giraph-core/src/main/java/org/apache/giraph/jython/wrappers/JythonWrapperBase.java @@ -59,6 +59,9 @@ public class JythonWrapperBase extends PyObject { @Override public boolean equals(Object obj) { + if (obj == null) { + return false; + } if (this == obj) { return true; } http://git-wip-us.apache.org/repos/asf/giraph/blob/7d0e0069/giraph-core/src/main/java/org/apache/giraph/jython/wrappers/JythonWritableWrapper.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/jython/wrappers/JythonWritableWrapper.java b/giraph-core/src/main/java/org/apache/giraph/jython/wrappers/JythonWritableWrapper.java index 4788634..17f8b60 100644 --- a/giraph-core/src/main/java/org/apache/giraph/jython/wrappers/JythonWritableWrapper.java +++ b/giraph-core/src/main/java/org/apache/giraph/jython/wrappers/JythonWritableWrapper.java @@ -90,4 +90,23 @@ public class JythonWritableWrapper extends JythonWrapperBase } return result; } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (this == obj) { + return true; + } + if (obj instanceof JythonWritableWrapper) { + return compareTo(obj) == 0; + } + return false; + } + + @Override + public int hashCode() { + return getPyObject().__hash__().asInt(); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/7d0e0069/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index d08495b..454c934 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -92,6 +92,7 @@ import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.io.PrintStream; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -264,7 +265,7 @@ public class BspServiceMaster<I extends WritableComparable, } try { getZkExt().createExt(masterJobStatePath + "/jobState", - jobState.toString().getBytes(), + jobState.toString().getBytes(Charset.defaultCharset()), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL, true); @@ -597,7 +598,8 @@ public class BspServiceMaster<I extends WritableComparable, if (getZkExt().exists(inputSplitsPath, false) != null) { LOG.info(inputSplitsPath + " already exists, no need to create"); return Integer.parseInt( - new String(getZkExt().getData(inputSplitsPath, false, null))); + new String(getZkExt().getData(inputSplitsPath, false, null), + Charset.defaultCharset())); } } catch (KeeperException.NoNodeException e) { if (LOG.isInfoEnabled()) { @@ -940,7 +942,8 @@ public class BspServiceMaster<I extends WritableComparable, try { byte [] zkData = getZkExt().getData(finishedPath, false, null); - workerFinishedInfoObj = new JSONObject(new String(zkData)); + workerFinishedInfoObj = new JSONObject(new String(zkData, + Charset.defaultCharset())); List<PartitionStats> statsList = WritableUtils.readListFieldsFromByteArray( Base64.decode(workerFinishedInfoObj.getString( @@ -1019,7 +1022,8 @@ public class BspServiceMaster<I extends WritableComparable, throw new RuntimeException( "printAggregatedMetricsToHDFS: metrics file exists"); } - out = new PrintStream(fs.create(outFile)); + out = new PrintStream(fs.create(outFile), false, + Charset.defaultCharset().name()); aggregatedMetrics.print(superstep, out); } catch (IOException e) { throw new RuntimeException( @@ -2040,7 +2044,7 @@ public class BspServiceMaster<I extends WritableComparable, if (LOG.isDebugEnabled()) { LOG.debug("call: Created input split " + "with index " + index + " serialized as " + - byteArrayOutputStream.toString()); + byteArrayOutputStream.toString(Charset.defaultCharset().name())); } } catch (KeeperException.NodeExistsException e) { if (LOG.isInfoEnabled()) { http://git-wip-us.apache.org/repos/asf/giraph/blob/7d0e0069/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java index 1a16dbe..110ce9d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java @@ -462,7 +462,9 @@ public class DiskBackedPartitionStore<I extends WritableComparable, inputStream = null; } } - file.delete(); + if (!file.delete()) { + LOG.error("loadPartition: Failed to delete file " + file); + } file = new File(getEdgesPath(id)); if (LOG.isDebugEnabled()) { LOG.debug("loadPartition: loading partition edges " + @@ -483,7 +485,9 @@ public class DiskBackedPartitionStore<I extends WritableComparable, * If the graph is static, keep the file around. */ if (!conf.isStaticGraph()) { - file.delete(); + if (!file.delete()) { + LOG.error("loadPartition: Failed to delete file " + file); + } } return partition; } @@ -497,8 +501,12 @@ public class DiskBackedPartitionStore<I extends WritableComparable, private void offloadPartition(Partition<I, V, E> partition) throws IOException { File file = new File(getVerticesPath(partition.getId())); - file.getParentFile().mkdirs(); - file.createNewFile(); + if (!file.getParentFile().mkdirs()) { + LOG.error("offloadPartition: Failed to create directory " + file); + } + if (!file.createNewFile()) { + LOG.error("offloadPartition: Failed to create file " + file); + } if (LOG.isDebugEnabled()) { LOG.debug("offloadPartition: writing partition vertices " + partition.getId() + " to " + file.getAbsolutePath()); @@ -522,7 +530,9 @@ public class DiskBackedPartitionStore<I extends WritableComparable, * the graph is not changing. */ if (!conf.isStaticGraph() || !file.exists()) { - file.createNewFile(); + if (!file.createNewFile()) { + LOG.error("offloadPartition: Failed to create file " + file); + } if (LOG.isDebugEnabled()) { LOG.debug("offloadPartition: writing partition edges " + partition.getId() + " to " + file.getAbsolutePath()); @@ -588,9 +598,13 @@ public class DiskBackedPartitionStore<I extends WritableComparable, */ public void deletePartitionFiles(Integer id) { File file = new File(getVerticesPath(id)); - file.delete(); + if (!file.delete()) { + LOG.error("deletePartitionFiles: Failed to delete file " + file); + } file = new File(getEdgesPath(id)); - file.delete(); + if (!file.delete()) { + LOG.error("deletePartitionFiles: Failed to delete file " + file); + } } /** @@ -671,7 +685,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable, * Wait until we have space in memory or inactive data for a switch */ while (inMemoryPartitions >= maxInMemoryPartitions && - inactive.size() == 0) { + inactive.isEmpty()) { notEmpty.await(); } /* http://git-wip-us.apache.org/repos/asf/giraph/blob/7d0e0069/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java index 2befa9c..3454d62 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java @@ -22,6 +22,8 @@ import org.apache.giraph.worker.WorkerInfo; import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Logger; +import com.google.common.base.Objects; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -184,6 +186,17 @@ public class PartitionBalancer { return (int) (getValue() - ((WorkerInfoAssignments) other).getValue()); } + + @Override + public boolean equals(Object obj) { + return obj instanceof WorkerInfoAssignments && + compareTo((WorkerInfoAssignments) obj) == 0; + } + + @Override + public int hashCode() { + return Objects.hashCode(value); + } } /** http://git-wip-us.apache.org/repos/asf/giraph/blob/7d0e0069/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java index fc75006..b055f4d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java @@ -27,6 +27,7 @@ import org.apache.log4j.Logger; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import java.io.Serializable; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -53,7 +54,10 @@ public class PartitionUtils { * Compare edge counts for Entry<WorkerInfo, VertexEdgeCount> objects. */ private static class EdgeCountComparator implements - Comparator<Entry<WorkerInfo, VertexEdgeCount>> { + Comparator<Entry<WorkerInfo, VertexEdgeCount>>, Serializable { + /** Serialization version. */ + private static final long serialVersionUID = 1L; + @Override public int compare(Entry<WorkerInfo, VertexEdgeCount> worker1, Entry<WorkerInfo, VertexEdgeCount> worker2) { @@ -67,7 +71,10 @@ public class PartitionUtils { * {@link VertexEdgeCount}. */ private static class VertexCountComparator implements - Comparator<Entry<WorkerInfo, VertexEdgeCount>> { + Comparator<Entry<WorkerInfo, VertexEdgeCount>>, Serializable { + /** Serialization version. */ + private static final long serialVersionUID = 1L; + @Override public int compare(Entry<WorkerInfo, VertexEdgeCount> worker1, Entry<WorkerInfo, VertexEdgeCount> worker2) { http://git-wip-us.apache.org/repos/asf/giraph/blob/7d0e0069/giraph-core/src/main/java/org/apache/giraph/utils/EdgeComparator.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/EdgeComparator.java b/giraph-core/src/main/java/org/apache/giraph/utils/EdgeComparator.java index df40f01..7539743 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/EdgeComparator.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/EdgeComparator.java @@ -22,6 +22,7 @@ import com.google.common.collect.ComparisonChain; import org.apache.giraph.edge.Edge; import org.apache.hadoop.io.WritableComparable; +import java.io.Serializable; import java.util.Comparator; /** @@ -31,7 +32,11 @@ import java.util.Comparator; * @param <E> Edge value (needs to be WritableComparable) */ public class EdgeComparator<I extends WritableComparable, - E extends WritableComparable> implements Comparator<Edge<I, E>> { + E extends WritableComparable> implements Comparator<Edge<I, E>>, + Serializable { + /** Serialization version. */ + private static final long serialVersionUID = 1L; + @Override public int compare(Edge<I, E> e1, Edge<I, E> e2) { return compareEdges(e1, e2); http://git-wip-us.apache.org/repos/asf/giraph/blob/7d0e0069/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java index 1bc11a8..6e8b1e3 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java @@ -21,6 +21,7 @@ package org.apache.giraph.utils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; import com.google.common.base.Charsets; import com.google.common.io.Closeables; @@ -35,6 +36,8 @@ import java.io.Writer; * Helper class for filesystem operations during testing */ public class FileUtils { + /** Logger */ + private static final Logger LOG = Logger.getLogger(FileUtils.class); /** * Utility class should not be instantiatable @@ -85,7 +88,9 @@ public class FileUtils { public static File createTempDir(File parent, String name) throws IOException { File dir = createTestTempFileOrDir(parent, name, true); - dir.delete(); + if (!dir.delete()) { + LOG.error("createTempDir: Failed to create directory " + dir); + } return dir; } @@ -146,9 +151,13 @@ public class FileUtils { @Override public boolean accept(File f) { if (!f.isFile()) { - f.listFiles(this); + if (f.listFiles(this) == null) { + LOG.error("accept: Failed to list files of " + f); + } + } + if (!f.delete()) { + LOG.error("accept: Failed to delete file " + f); } - f.delete(); return false; } } http://git-wip-us.apache.org/repos/asf/giraph/blob/7d0e0069/giraph-core/src/main/java/org/apache/giraph/utils/JMap.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/JMap.java b/giraph-core/src/main/java/org/apache/giraph/utils/JMap.java index 69d1ab3..19c8efd 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/JMap.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/JMap.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintStream; import java.lang.management.ManagementFactory; +import java.nio.charset.Charset; import java.util.Date; /** @@ -69,7 +70,7 @@ public class JMap { try { Process p = Runtime.getRuntime().exec(CMD + ARGS + getProcessId()); BufferedReader in = new BufferedReader( - new InputStreamReader(p.getInputStream())); + new InputStreamReader(p.getInputStream(), Charset.defaultCharset())); printStream.println("JMap histo dump at " + new Date()); String line = in.readLine(); for (int i = 0; i < numLines && line != null; ++i) { http://git-wip-us.apache.org/repos/asf/giraph/blob/7d0e0069/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index da1e7fb..9311fbd 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -92,6 +92,7 @@ import java.io.DataInputStream; import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -893,7 +894,7 @@ public class BspServiceWorker<I extends WritableComparable, "/" + getHostnamePartitionId(); try { getZkExt().createExt(finishedWorkerPath, - workerFinishedInfoObj.toString().getBytes(), + workerFinishedInfoObj.toString().getBytes(Charset.defaultCharset()), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, true); http://git-wip-us.apache.org/repos/asf/giraph/blob/7d0e0069/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java index 7a374b1..3364dfd 100644 --- a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java @@ -45,6 +45,7 @@ import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketTimeoutException; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedList; @@ -216,7 +217,8 @@ public class ZooKeeperManager { public StreamCollector(final InputStream is) { super(StreamCollector.class.getName()); setDaemon(true); - InputStreamReader streamReader = new InputStreamReader(is); + InputStreamReader streamReader = new InputStreamReader(is, + Charset.defaultCharset()); bufferedReader = new BufferedReader(streamReader); } @@ -602,7 +604,7 @@ public class ZooKeeperManager { */ public void onlineZooKeeperServers() { Integer taskId = zkServerPortMap.get(myHostname); - if ((taskId != null) && (taskId.intValue() == taskPartition)) { + if ((taskId != null) && (taskId == taskPartition)) { File zkDirFile = new File(this.zkDir); try { if (LOG.isInfoEnabled()) { http://git-wip-us.apache.org/repos/asf/giraph/blob/7d0e0069/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java index 5c23b5a..0c16120 100644 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java @@ -21,6 +21,7 @@ package org.apache.giraph.examples; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.nio.charset.Charset; import java.util.Set; import org.apache.giraph.worker.WorkerContext; @@ -124,7 +125,7 @@ public class RandomWalkWorkerContext extends WorkerContext { sourceFile = cacheFiles[0]; FileSystem fs = FileSystem.getLocal(configuration); BufferedReader in = new BufferedReader(new InputStreamReader( - fs.open(sourceFile))); + fs.open(sourceFile), Charset.defaultCharset())); String line; while ((line = in.readLine()) != null) { builder.add(Long.parseLong(line)); @@ -142,7 +143,15 @@ public class RandomWalkWorkerContext extends WorkerContext { @Override public void preApplication() throws InstantiationException, IllegalAccessException { - Configuration configuration = getContext().getConfiguration(); + setStaticVars(getContext().getConfiguration()); + } + + /** + * Set static variables from Configuration + * + * @param configuration the conf + */ + private void setStaticVars(Configuration configuration) { MAX_SUPERSTEPS = configuration.getInt(RandomWalkComputation.MAX_SUPERSTEPS, DEFAULT_MAX_SUPERSTEPS); TELEPORTATION_PROBABILITY = configuration.getFloat( http://git-wip-us.apache.org/repos/asf/giraph/blob/7d0e0069/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleAggregatorWriter.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleAggregatorWriter.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleAggregatorWriter.java index b019b40..d3d5375 100644 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleAggregatorWriter.java +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleAggregatorWriter.java @@ -50,12 +50,21 @@ public class SimpleAggregatorWriter extends @Override public void initialize(Context context, long applicationAttempt) throws IOException { - FILENAME = "aggregatedValues_" + applicationAttempt; + setFilename(applicationAttempt); Path p = new Path(FILENAME); FileSystem fs = FileSystem.get(context.getConfiguration()); output = fs.create(p, true); } + /** + * Set filename written to + * + * @param applicationAttempt app attempt + */ + private static void setFilename(long applicationAttempt) { + FILENAME = "aggregatedValues_" + applicationAttempt; + } + @Override public void writeAggregator( Iterable<Entry<String, Writable>> aggregatorMap, http://git-wip-us.apache.org/repos/asf/giraph/blob/7d0e0069/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCheckpoint.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCheckpoint.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCheckpoint.java index 984e079..005754e 100644 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCheckpoint.java +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCheckpoint.java @@ -161,11 +161,20 @@ public class SimpleCheckpoint implements Tool { @Override public void postApplication() { - FINAL_SUM = this.<LongWritable>getAggregatedValue( - LongSumAggregator.class.getName()).get(); + setFinalSum(this.<LongWritable>getAggregatedValue( + LongSumAggregator.class.getName()).get()); LOG.info("FINAL_SUM=" + FINAL_SUM); } + /** + * Set the final sum + * + * @param value sum + */ + private static void setFinalSum(long value) { + FINAL_SUM = value; + } + @Override public void preSuperstep() { } http://git-wip-us.apache.org/repos/asf/giraph/blob/7d0e0069/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleFailComputation.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleFailComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleFailComputation.java index 1582902..105a2fb 100644 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleFailComputation.java +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleFailComputation.java @@ -68,7 +68,16 @@ public class SimpleFailComputation extends BasicComputation< } else { vertex.voteToHalt(); } - SUPERSTEP = getSuperstep(); + setSuperstep(getSuperstep()); } } + + /** + * Set the superstep + * + * @param superstep to set + */ + private static void setSuperstep(long superstep) { + SUPERSTEP = superstep; + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/7d0e0069/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMasterComputeComputation.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMasterComputeComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMasterComputeComputation.java index b9b2373..0c43ff9 100644 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMasterComputeComputation.java +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMasterComputeComputation.java @@ -80,7 +80,7 @@ public class SimpleMasterComputeComputation extends BasicComputation< public void postApplication() { } - public void setFinalSum(double sum) { + public static void setFinalSum(double sum) { FINAL_SUM = sum; } http://git-wip-us.apache.org/repos/asf/giraph/blob/7d0e0069/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingComputation.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingComputation.java index 8608d02..6a5029e 100644 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingComputation.java +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingComputation.java @@ -25,6 +25,7 @@ import org.apache.giraph.graph.Vertex; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; +import com.google.common.base.Objects; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -134,6 +135,23 @@ public class SimpleTriangleClosingComputation extends BasicComputation< public int compareTo(Pair other) { return other.value - this.value; } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj instanceof Pair) { + Pair other = (Pair) obj; + return Objects.equal(value, other.value); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hashCode(value); + } } /** Utility class for delivering the array of vertices THIS vertex http://git-wip-us.apache.org/repos/asf/giraph/blob/7d0e0069/giraph-rexster/src/main/java/org/apache/giraph/rexster/utils/RexsterUtils.java ---------------------------------------------------------------------- diff --git a/giraph-rexster/src/main/java/org/apache/giraph/rexster/utils/RexsterUtils.java b/giraph-rexster/src/main/java/org/apache/giraph/rexster/utils/RexsterUtils.java index 23c9b58..e669ca9 100644 --- a/giraph-rexster/src/main/java/org/apache/giraph/rexster/utils/RexsterUtils.java +++ b/giraph-rexster/src/main/java/org/apache/giraph/rexster/utils/RexsterUtils.java @@ -33,6 +33,7 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; @@ -261,14 +262,14 @@ public abstract class RexsterUtils { username = GIRAPH_REXSTER_USERNAME.get(conf); password = GIRAPH_REXSTER_PASSWORD.get(conf); - auth = "Basic " + - Base64.encodeBase64URLSafeString( - (username + ":" + password).getBytes()); + byte[] authBytes = (username + ":" + password).getBytes( + Charset.defaultCharset()); + auth = "Basic " + Base64.encodeBase64URLSafeString(authBytes); connection = createConnection(url, auth); connection.setDoOutput(true); is = connection.getInputStream(); - isr = new InputStreamReader(is); + isr = new InputStreamReader(is, Charset.defaultCharset()); return new BufferedReader(isr);
