Repository: kafka Updated Branches: refs/heads/1.0 2b5f69695 -> d3c46ceb7
KAFKA-6167: Timestamp on streams directory contains a colon, which is an illegal character - change segment delimiter to . - added upgrade path - added test for old and new upgrade path Author: Matthias J. Sax <[email protected]> Reviewers: Damian Guy <[email protected]>, Guozhang Wang <[email protected]> Closes #4210 from mjsax/kafka-6167-windows-issue (cherry picked from commit 539c4d53f8fac65063e4e519c6a51911550a151f) Signed-off-by: Guozhang Wang <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d3c46ceb Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d3c46ceb Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d3c46ceb Branch: refs/heads/1.0 Commit: d3c46ceb71cc05db02b3fa5b89e788066a889370 Parents: 2b5f696 Author: Matthias J. Sax <[email protected]> Authored: Wed Nov 15 17:43:35 2017 -0800 Committer: Guozhang Wang <[email protected]> Committed: Wed Nov 15 17:43:45 2017 -0800 ---------------------------------------------------------------------- .../kafka/streams/state/internals/Segments.java | 53 +++++++++----- .../RocksDBSegmentedBytesStoreTest.java | 26 ++++++- .../streams/state/internals/SegmentsTest.java | 73 ++++++++++++++++---- 3 files changed, 120 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d3c46ceb/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java index 7c6bb53..5993972 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java @@ -28,7 +28,6 @@ import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; -import java.util.Date; import java.util.List; import java.util.Map; import java.util.SimpleTimeZone; @@ -67,9 +66,11 @@ class Segments { } String segmentName(final long segmentId) { - // previous format used - as a separator so if this changes in the future + // (1) previous format used - as a separator so if this changes in the future // then we should use something different. - return name + ":" + segmentId * segmentInterval; + // (2) previous format used : as a separator (which did break KafkaStreams on Windows OS) + // so if this changes in the future then we should use something different. + return name + "." + segmentId * segmentInterval; } Segment getSegmentForTimestamp(final long timestamp) { @@ -190,33 +191,49 @@ class Segments { private long segmentIdFromSegmentName(final String segmentName, final File parent) { + final int segmentSeparatorIndex = name.length(); + final char segmentSeparator = segmentName.charAt(segmentSeparatorIndex); + final String segmentIdString = segmentName.substring(segmentSeparatorIndex + 1); + final long segmentId; + // old style segment name with date - if (segmentName.charAt(name.length()) == '-') { - final String datePart = segmentName.substring(name.length() + 1); - final Date date; + if (segmentSeparator == '-') { try { - date = formatter.parse(datePart); - final long segmentId = date.getTime() / segmentInterval; - final File newName = new File(parent, segmentName(segmentId)); - final File oldName = new File(parent, segmentName); - if (!oldName.renameTo(newName)) { - throw new ProcessorStateException("Unable to rename old style segment from: " - + oldName - + " to new name: " - + newName); - } - return segmentId; + segmentId = formatter.parse(segmentIdString).getTime() / segmentInterval; } catch (ParseException e) { log.warn("Unable to parse segmentName {} to a date. This segment will be skipped", segmentName); return -1L; } + renameSegmentFile(parent, segmentName, segmentId); } else { + // for both new formats (with : or .) parse segment ID identically try { - return Long.parseLong(segmentName.substring(name.length() + 1)) / segmentInterval; + segmentId = Long.parseLong(segmentIdString) / segmentInterval; } catch (NumberFormatException e) { throw new ProcessorStateException("Unable to parse segment id as long from segmentName: " + segmentName); } + + // intermediate segment name with : breaks KafkaStreams on Windows OS -> rename segment file to new name with . + if (segmentSeparator == ':') { + renameSegmentFile(parent, segmentName, segmentId); + } } + return segmentId; + } + + private void renameSegmentFile(final File parent, + final String segmentName, + final long segmentId) { + final File newName = new File(parent, segmentName(segmentId)); + final File oldName = new File(parent, segmentName); + if (!oldName.renameTo(newName)) { + throw new ProcessorStateException("Unable to rename old style segment from: " + + oldName + + " to new name: " + + newName); + } + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/d3c46ceb/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java index 2dfc84b..bf3386d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java @@ -164,7 +164,7 @@ public class RocksDBSegmentedBytesStoreTest { bytesStore.close(); final String firstSegmentName = segments.segmentName(0); - final String[] nameParts = firstSegmentName.split(":"); + final String[] nameParts = firstSegmentName.split("\\."); final Long segmentId = Long.parseLong(nameParts[1]); final SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm"); formatter.setTimeZone(new SimpleTimeZone(0, "UTC")); @@ -184,6 +184,30 @@ public class RocksDBSegmentedBytesStoreTest { KeyValue.pair(new Windowed<>(key, new SessionWindow(30000L, 60000L)), 100L)))); } + @Test + public void shouldLoadSegementsWithOldStyleColonFormattedName() { + final Segments segments = new Segments(storeName, retention, numSegments); + final String key = "a"; + bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L)); + bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(30000L, 60000L))), serializeValue(100L)); + bytesStore.close(); + + final String firstSegmentName = segments.segmentName(0); + final String[] nameParts = firstSegmentName.split("\\."); + final File parent = new File(stateDir, storeName); + final File oldStyleName = new File(parent, nameParts[0] + ":" + Long.parseLong(nameParts[1])); + assertTrue(new File(parent, firstSegmentName).renameTo(oldStyleName)); + + bytesStore = new RocksDBSegmentedBytesStore(storeName, + retention, + numSegments, + schema); + + bytesStore.init(context, bytesStore); + final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60000L)); + assertThat(results, equalTo(Arrays.asList(KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 50L), + KeyValue.pair(new Windowed<>(key, new SessionWindow(30000L, 60000L)), 100L)))); + } private Set<String> segmentDirs() { File windowDir = new File(stateDir, storeName); http://git-wip-us.apache.org/repos/asf/kafka/blob/d3c46ceb/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java index 0646b7e..65cfb21 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java @@ -28,7 +28,10 @@ import org.junit.Before; import org.junit.Test; import java.io.File; +import java.text.SimpleDateFormat; +import java.util.Date; import java.util.List; +import java.util.SimpleTimeZone; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -41,16 +44,19 @@ public class SegmentsTest { private MockProcessorContext context; private Segments segments; private long segmentInterval; + private File stateDirectory; + private String storeName = "test"; + private final int retentionPeriod = 4 * 60 * 1000; @Before public void createContext() { - context = new MockProcessorContext(TestUtils.tempDirectory(), + stateDirectory = TestUtils.tempDirectory(); + context = new MockProcessorContext(stateDirectory, Serdes.String(), Serdes.Long(), new NoOpRecordCollector(), new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))); - int retentionPeriod = 4 * 60 * 1000; - segments = new Segments("test", retentionPeriod, NUM_SEGMENTS); + segments = new Segments(storeName, retentionPeriod, NUM_SEGMENTS); segmentInterval = Segments.segmentInterval(retentionPeriod, NUM_SEGMENTS); } @@ -78,9 +84,9 @@ public class SegmentsTest { @Test public void shouldGetSegmentNameFromId() throws Exception { - assertEquals("test:0", segments.segmentName(0)); - assertEquals("test:" + segmentInterval, segments.segmentName(1)); - assertEquals("test:" + 2 * segmentInterval, segments.segmentName(2)); + assertEquals("test.0", segments.segmentName(0)); + assertEquals("test." + segmentInterval, segments.segmentName(1)); + assertEquals("test." + 2 * segmentInterval, segments.segmentName(2)); } @Test @@ -88,9 +94,9 @@ public class SegmentsTest { final Segment segment1 = segments.getOrCreateSegment(0, context); final Segment segment2 = segments.getOrCreateSegment(1, context); final Segment segment3 = segments.getOrCreateSegment(2, context); - assertTrue(new File(context.stateDir(), "test/test:0").isDirectory()); - assertTrue(new File(context.stateDir(), "test/test:" + segmentInterval).isDirectory()); - assertTrue(new File(context.stateDir(), "test/test:" + 2 * segmentInterval).isDirectory()); + assertTrue(new File(context.stateDir(), "test/test.0").isDirectory()); + assertTrue(new File(context.stateDir(), "test/test." + segmentInterval).isDirectory()); + assertTrue(new File(context.stateDir(), "test/test." + 2 * segmentInterval).isDirectory()); assertEquals(true, segment1.isOpen()); assertEquals(true, segment2.isOpen()); assertEquals(true, segment3.isOpen()); @@ -100,7 +106,7 @@ public class SegmentsTest { public void shouldNotCreateSegmentThatIsAlreadyExpired() { segments.getOrCreateSegment(7, context); assertNull(segments.getOrCreateSegment(0, context)); - assertFalse(new File(context.stateDir(), "test/test:0").exists()); + assertFalse(new File(context.stateDir(), "test/test.0").exists()); } @Test @@ -111,9 +117,9 @@ public class SegmentsTest { assertFalse(segment1.isOpen()); assertFalse(segment2.isOpen()); assertTrue(segment3.isOpen()); - assertFalse(new File(context.stateDir(), "test/test:0").exists()); - assertFalse(new File(context.stateDir(), "test/test:" + segmentInterval).exists()); - assertTrue(new File(context.stateDir(), "test/test:" + 7 * segmentInterval).exists()); + assertFalse(new File(context.stateDir(), "test/test.0").exists()); + assertFalse(new File(context.stateDir(), "test/test." + segmentInterval).exists()); + assertTrue(new File(context.stateDir(), "test/test." + 7 * segmentInterval).exists()); } @Test @@ -203,6 +209,47 @@ public class SegmentsTest { verifyCorrectSegments(2, 5); } + @Test + public void shouldUpdateSegmentFileNameFromOldDateFormatToNewFormat() throws Exception { + final String storeDirectoryPath = stateDirectory.getAbsolutePath() + File.separator + storeName; + final File storeDirectory = new File(storeDirectoryPath); + storeDirectory.mkdirs(); + + final SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm"); + formatter.setTimeZone(new SimpleTimeZone(0, "UTC")); + + for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) { + final File oldSegment = new File(storeDirectoryPath + File.separator + storeName + "-" + formatter.format(new Date(segmentId * segmentInterval))); + oldSegment.createNewFile(); + } + + segments.openExisting(context); + + for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) { + final File newSegment = new File(storeDirectoryPath + File.separator + storeName + "." + segmentId * (retentionPeriod / (NUM_SEGMENTS - 1))); + assertTrue(newSegment.exists()); + } + } + + @Test + public void shouldUpdateSegmentFileNameFromOldColonFormatToNewFormat() throws Exception { + final String storeDirectoryPath = stateDirectory.getAbsolutePath() + File.separator + storeName; + final File storeDirectory = new File(storeDirectoryPath); + storeDirectory.mkdirs(); + + for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) { + final File oldSegment = new File(storeDirectoryPath + File.separator + storeName + ":" + segmentId * (retentionPeriod / (NUM_SEGMENTS - 1))); + oldSegment.createNewFile(); + } + + segments.openExisting(context); + + for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) { + final File newSegment = new File(storeDirectoryPath + File.separator + storeName + "." + segmentId * (retentionPeriod / (NUM_SEGMENTS - 1))); + assertTrue(newSegment.exists()); + } + } + private void verifyCorrectSegments(final long first, final int numSegments) { final List<Segment> result = this.segments.segments(0, Long.MAX_VALUE); assertEquals(numSegments, result.size());
