This is an automated email from the ASF dual-hosted git repository. srdo pushed a commit to branch 2.1.x-branch in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/2.1.x-branch by this push: new 0e5a68e STORM-1515: Fix LocalState Corruption 0e5a68e is described below commit 0e5a68e9d2e8d5e79a1af36203f52801ba6d4864 Author: Tim Frison <t...@invidi.com> AuthorDate: Tue Aug 13 11:28:31 2019 -0600 STORM-1515: Fix LocalState Corruption When a windows machine has a power failure, the local state file can become corrupted with repeated NUL characters. On restart, when the supervisor attempts to get the worker's heartbeat it will fail to deserialized the LocalStateData (because it is all NUL characters) and it will fail to start the workers. --- .../src/jvm/org/apache/storm/utils/LocalState.java | 5 +++ .../test/java/org/apache/storm/LocalStateTest.java | 40 +++++++++++++++++----- 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/storm-client/src/jvm/org/apache/storm/utils/LocalState.java b/storm-client/src/jvm/org/apache/storm/utils/LocalState.java index 6e1b44f..d5996d2 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/LocalState.java +++ b/storm-client/src/jvm/org/apache/storm/utils/LocalState.java @@ -32,6 +32,7 @@ import org.apache.storm.shade.org.apache.commons.io.FileUtils; import org.apache.storm.thrift.TBase; import org.apache.storm.thrift.TDeserializer; import org.apache.storm.thrift.TSerializer; +import org.apache.storm.thrift.protocol.TProtocolException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,6 +124,10 @@ public class LocalState { } catch (Exception e) { attempts++; if (attempts >= 10) { + if (e.getCause() instanceof TProtocolException) { + LOG.warn("LocalState file is corrupted, resetting state.", e); + return new HashMap<>(); + } throw new RuntimeException(e); } } diff --git a/storm-server/src/test/java/org/apache/storm/LocalStateTest.java b/storm-server/src/test/java/org/apache/storm/LocalStateTest.java index 97765b2..2a14857 100644 --- a/storm-server/src/test/java/org/apache/storm/LocalStateTest.java +++ b/storm-server/src/test/java/org/apache/storm/LocalStateTest.java @@ -57,15 +57,37 @@ public class LocalStateTest { @Test public void testEmptyState() throws IOException { - TmpPath tmp_dir = new TmpPath(); - String dir = tmp_dir.getPath(); - LocalState ls = new LocalState(dir, true); - GlobalStreamId gs_a = new GlobalStreamId("a", "a"); - FileOutputStream data = FileUtils.openOutputStream(new File(dir, "12345")); - FileOutputStream version = FileUtils.openOutputStream(new File(dir, "12345.version")); - Assert.assertNull(ls.get("c")); - ls.put("a", gs_a); - Assert.assertEquals(gs_a, ls.get("a")); + try (TmpPath tmp_dir = new TmpPath()) { + GlobalStreamId globalStreamId_a = new GlobalStreamId("a", "a"); + + String dir = tmp_dir.getPath(); + LocalState ls = new LocalState(dir, true); + + FileUtils.touch(new File(dir, "12345")); + FileUtils.touch(new File(dir, "12345.version")); + + Assert.assertNull(ls.get("c")); + ls.put("a", globalStreamId_a); + Assert.assertEquals(globalStreamId_a, ls.get("a")); + } + } + @Test + public void testAllNulState() throws IOException { + try (TmpPath tmp_dir = new TmpPath()) { + GlobalStreamId globalStreamId_a = new GlobalStreamId("a", "a"); + + String dir = tmp_dir.getPath(); + LocalState ls = new LocalState(dir, true); + + FileUtils.touch(new File(dir, "12345.version")); + + try (FileOutputStream data = FileUtils.openOutputStream(new File(dir, "12345"))) { + Assert.assertNull(ls.get("c")); + data.write(new byte[100]); + ls.put("a", globalStreamId_a); + Assert.assertEquals(globalStreamId_a, ls.get("a")); + } + } } }