This is an automated email from the ASF dual-hosted git repository. srdo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push: new d212b59 STORM-1515: Fix LocalState Corruption new 23cd491 Merge pull request #3101 from frison/STORM-1515 d212b59 is described below commit d212b59cd05d2c67a9b09d0c6a98d241f77824fb Author: Tim Frison <t...@invidi.com> AuthorDate: Tue Aug 13 11:28:31 2019 -0600 STORM-1515: Fix LocalState Corruption When a windows machine has a power failure, the local state file can become corrupted with repeated NUL characters. On restart, when the supervisor attempts to get the worker's heartbeat it will fail to deserialized the LocalStateData (because it is all NUL characters) and it will fail to start the workers. --- .../src/jvm/org/apache/storm/utils/LocalState.java | 5 +++ .../test/java/org/apache/storm/LocalStateTest.java | 40 +++++++++++++++++----- 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/storm-client/src/jvm/org/apache/storm/utils/LocalState.java b/storm-client/src/jvm/org/apache/storm/utils/LocalState.java index 6e1b44f..d5996d2 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/LocalState.java +++ b/storm-client/src/jvm/org/apache/storm/utils/LocalState.java @@ -32,6 +32,7 @@ import org.apache.storm.shade.org.apache.commons.io.FileUtils; import org.apache.storm.thrift.TBase; import org.apache.storm.thrift.TDeserializer; import org.apache.storm.thrift.TSerializer; +import org.apache.storm.thrift.protocol.TProtocolException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,6 +124,10 @@ public class LocalState { } catch (Exception e) { attempts++; if (attempts >= 10) { + if (e.getCause() instanceof TProtocolException) { + LOG.warn("LocalState file is corrupted, resetting state.", e); + return new HashMap<>(); + } throw new RuntimeException(e); } } diff --git a/storm-server/src/test/java/org/apache/storm/LocalStateTest.java b/storm-server/src/test/java/org/apache/storm/LocalStateTest.java index 97765b2..2a14857 100644 --- a/storm-server/src/test/java/org/apache/storm/LocalStateTest.java +++ b/storm-server/src/test/java/org/apache/storm/LocalStateTest.java @@ -57,15 +57,37 @@ public class LocalStateTest { @Test public void testEmptyState() throws IOException { - TmpPath tmp_dir = new TmpPath(); - String dir = tmp_dir.getPath(); - LocalState ls = new LocalState(dir, true); - GlobalStreamId gs_a = new GlobalStreamId("a", "a"); - FileOutputStream data = FileUtils.openOutputStream(new File(dir, "12345")); - FileOutputStream version = FileUtils.openOutputStream(new File(dir, "12345.version")); - Assert.assertNull(ls.get("c")); - ls.put("a", gs_a); - Assert.assertEquals(gs_a, ls.get("a")); + try (TmpPath tmp_dir = new TmpPath()) { + GlobalStreamId globalStreamId_a = new GlobalStreamId("a", "a"); + + String dir = tmp_dir.getPath(); + LocalState ls = new LocalState(dir, true); + + FileUtils.touch(new File(dir, "12345")); + FileUtils.touch(new File(dir, "12345.version")); + + Assert.assertNull(ls.get("c")); + ls.put("a", globalStreamId_a); + Assert.assertEquals(globalStreamId_a, ls.get("a")); + } + } + @Test + public void testAllNulState() throws IOException { + try (TmpPath tmp_dir = new TmpPath()) { + GlobalStreamId globalStreamId_a = new GlobalStreamId("a", "a"); + + String dir = tmp_dir.getPath(); + LocalState ls = new LocalState(dir, true); + + FileUtils.touch(new File(dir, "12345.version")); + + try (FileOutputStream data = FileUtils.openOutputStream(new File(dir, "12345"))) { + Assert.assertNull(ls.get("c")); + data.write(new byte[100]); + ls.put("a", globalStreamId_a); + Assert.assertEquals(globalStreamId_a, ls.get("a")); + } + } } }