This is an automated email from the ASF dual-hosted git repository. himanshug pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push: new ab5b3be Add shuffleSegmentPusher for data shuffle (#8115) ab5b3be is described below commit ab5b3be6c61d7be540f85385327a6a6489b363b1 Author: Jihoon Son <jihoon...@apache.org> AuthorDate: Mon Aug 5 13:38:35 2019 -0700 Add shuffleSegmentPusher for data shuffle (#8115) * Fix race between canHandle() and addSegment() in StorageLocation * add comment * Add shuffleSegmentPusher which is a dataSegmentPusher used for writing shuffle data in local storage. * add comments * unused import * add comments * fix test * address comments * remove <p> tag from javadoc * address comments * comparingLong * Address comments * fix test --- .../druid/indexing/common/config/TaskConfig.java | 5 + .../indexing/worker/IntermediaryDataManager.java | 166 +++++++++++++-------- .../indexing/worker/ShuffleDataSegmentPusher.java | 77 ++++++++++ .../IntermediaryDataManagerAutoCleanupTest.java | 15 +- ...ermediaryDataManagerManualAddAndDeleteTest.java | 47 +++--- .../worker/ShuffleDataSegmentPusherTest.java | 138 +++++++++++++++++ .../druid/segment/loading/StorageLocation.java | 7 +- ...mentLoaderLocalCacheManagerConcurrencyTest.java | 2 - .../druid/segment/loading/StorageLocationTest.java | 14 +- 9 files changed, 366 insertions(+), 105 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java index 31405fe..52bf083 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java @@ -126,6 +126,11 @@ public class TaskConfig return new File(getTaskDir(taskId), "work"); } + public File getTaskTempDir(String taskId) + { + return new File(getTaskDir(taskId), "temp"); + } + public File getTaskLockFile(String taskId) { return new File(getTaskDir(taskId), "lock"); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java index c47425a..95acb22 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java @@ -33,11 +33,13 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.segment.loading.StorageLocation; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.utils.CompressionUtils; import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.Period; @@ -45,6 +47,7 @@ import org.joda.time.Period; import javax.annotation.Nullable; import java.io.File; import java.io.IOException; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; @@ -57,12 +60,14 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.stream.IntStream; /** * This class manages intermediary segments for data shuffle between native parallel index tasks. - * In native parallel indexing, phase 1 tasks store segment files in local storage of middleManagers + * In native parallel indexing, phase 1 tasks store segment files in local storage of middleManagers (or indexer) * and phase 2 tasks read those files via HTTP. * * The directory where segment files are placed is structured as @@ -75,11 +80,12 @@ import java.util.stream.Collectors; @ManageLifecycle public class IntermediaryDataManager { - private static final Logger log = new Logger(IntermediaryDataManager.class); + private static final Logger LOG = new Logger(IntermediaryDataManager.class); private final long intermediaryPartitionDiscoveryPeriodSec; private final long intermediaryPartitionCleanupPeriodSec; private final Period intermediaryPartitionTimeout; + private final TaskConfig taskConfig; private final List<StorageLocation> shuffleDataLocations; private final IndexingServiceClient indexingServiceClient; @@ -108,6 +114,7 @@ public class IntermediaryDataManager this.intermediaryPartitionDiscoveryPeriodSec = workerConfig.getIntermediaryPartitionDiscoveryPeriodSec(); this.intermediaryPartitionCleanupPeriodSec = workerConfig.getIntermediaryPartitionCleanupPeriodSec(); this.intermediaryPartitionTimeout = workerConfig.getIntermediaryPartitionTimeout(); + this.taskConfig = taskConfig; this.shuffleDataLocations = taskConfig .getShuffleDataLocations() .stream() @@ -119,6 +126,7 @@ public class IntermediaryDataManager @LifecycleStart public void start() { + discoverSupervisorTaskPartitions(); supervisorTaskChecker = Execs.scheduledSingleThreaded("intermediary-data-manager-%d"); // Discover partitions for new supervisorTasks supervisorTaskChecker.scheduleAtFixedRate( @@ -127,7 +135,7 @@ public class IntermediaryDataManager discoverSupervisorTaskPartitions(); } catch (Exception e) { - log.warn(e, "Error while discovering supervisorTasks"); + LOG.warn(e, "Error while discovering supervisorTasks"); } }, intermediaryPartitionDiscoveryPeriodSec, @@ -141,10 +149,10 @@ public class IntermediaryDataManager deleteExpiredSuprevisorTaskPartitionsIfNotRunning(); } catch (InterruptedException e) { - log.error(e, "Error while cleaning up partitions for expired supervisors"); + LOG.error(e, "Error while cleaning up partitions for expired supervisors"); } catch (Exception e) { - log.warn(e, "Error while cleaning up partitions for expired supervisors"); + LOG.warn(e, "Error while cleaning up partitions for expired supervisors"); } }, intermediaryPartitionCleanupPeriodSec, @@ -163,9 +171,13 @@ public class IntermediaryDataManager supervisorTaskCheckTimes.clear(); } + /** + * IntermediaryDataManager periodically calls this method after it starts up to search for unknown intermediary data. + */ private void discoverSupervisorTaskPartitions() { for (StorageLocation location : shuffleDataLocations) { + final Path locationPath = location.getPath().toPath().toAbsolutePath(); final MutableInt numDiscovered = new MutableInt(0); final File[] dirsPerSupervisorTask = location.getPath().listFiles(); if (dirsPerSupervisorTask != null) { @@ -174,13 +186,32 @@ public class IntermediaryDataManager supervisorTaskCheckTimes.computeIfAbsent( supervisorTaskId, k -> { + for (File eachFile : FileUtils.listFiles(supervisorTaskDir, null, true)) { + final String relativeSegmentPath = locationPath + .relativize(eachFile.toPath().toAbsolutePath()) + .toString(); + // StorageLocation keeps track of how much storage capacity is being used. + // Newly found files should be known to the StorageLocation to keep it up to date. + final File reservedFile = location.reserve( + relativeSegmentPath, + eachFile.getName(), + eachFile.length() + ); + if (reservedFile == null) { + LOG.warn("Can't add a discovered partition[%s]", eachFile.getAbsolutePath()); + } + } numDiscovered.increment(); return DateTimes.nowUtc().plus(intermediaryPartitionTimeout); } ); } } - log.info("Discovered partitions for [%s] new supervisor tasks", numDiscovered.getValue()); + LOG.info( + "Discovered partitions for [%s] new supervisor tasks under location[%s]", + numDiscovered.getValue(), + location.getPath() + ); } } @@ -203,7 +234,7 @@ public class IntermediaryDataManager } } - log.info("Found [%s] expired supervisor tasks", expiredSupervisorTasks.size()); + LOG.info("Found [%s] expired supervisor tasks", expiredSupervisorTasks.size()); final Map<String, TaskStatus> taskStatuses = indexingServiceClient.getTaskStatuses(expiredSupervisorTasks); for (Entry<String, TaskStatus> entry : taskStatuses.entrySet()) { @@ -215,7 +246,7 @@ public class IntermediaryDataManager deletePartitions(supervisorTaskId); } catch (IOException e) { - log.warn(e, "Failed to delete partitions for task[%s]", supervisorTaskId); + LOG.warn(e, "Failed to delete partitions for task[%s]", supervisorTaskId); } } else { // If it's still running, update last access time. @@ -227,17 +258,74 @@ public class IntermediaryDataManager /** * Write a segment into one of configured locations. The location to write is chosen in a round-robin manner per * supervisorTaskId. - * - * This method is only useful for the new Indexer model. Tasks running in the existing middleManager should the static - * addSegment method. */ - public void addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentFile) + long addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentDir) + throws IOException { + // Get or create the location iterator for supervisorTask. final Iterator<StorageLocation> iterator = locationIterators.computeIfAbsent( supervisorTaskId, - k -> Iterators.cycle(shuffleDataLocations) + k -> { + final Iterator<StorageLocation> cyclicIterator = Iterators.cycle(shuffleDataLocations); + // Random start of the iterator + final int random = ThreadLocalRandom.current().nextInt(shuffleDataLocations.size()); + IntStream.range(0, random).forEach(i -> cyclicIterator.next()); + return cyclicIterator; + } ); - addSegment(iterator, shuffleDataLocations.size(), supervisorTaskId, subTaskId, segment, segmentFile); + + // Create a zipped segment in a temp directory. + final File taskTempDir = taskConfig.getTaskTempDir(subTaskId); + + try (final Closer resourceCloser = Closer.create()) { + if (taskTempDir.mkdirs()) { + resourceCloser.register(() -> { + try { + FileUtils.forceDelete(taskTempDir); + } + catch (IOException e) { + LOG.warn(e, "Failed to delete directory[%s]", taskTempDir.getAbsolutePath()); + } + }); + } + + // Tempary compressed file. Will be removed when taskTempDir is deleted. + final File tempZippedFile = new File(taskTempDir, segment.getId().toString()); + final long unzippedSizeBytes = CompressionUtils.zip(segmentDir, tempZippedFile); + if (unzippedSizeBytes == 0) { + throw new IOE( + "Read 0 bytes from segmentDir[%s]", + segmentDir.getAbsolutePath() + ); + } + + // Try copying the zipped segment to one of storage locations + for (int i = 0; i < shuffleDataLocations.size(); i++) { + final StorageLocation location = iterator.next(); + final String partitionFilePath = getPartitionFilePath( + supervisorTaskId, + subTaskId, + segment.getInterval(), + segment.getShardSpec().getPartitionNum() + ); + final File destFile = location.reserve(partitionFilePath, segment.getId().toString(), tempZippedFile.length()); + if (destFile != null) { + FileUtils.forceMkdirParent(destFile); + org.apache.druid.java.util.common.FileUtils.writeAtomically( + destFile, + out -> Files.asByteSource(tempZippedFile).copyTo(out) + ); + LOG.info( + "Wrote intermediary segment for segment[%s] of subtask[%s] at [%s]", + segment.getId(), + subTaskId, + destFile + ); + return unzippedSizeBytes; + } + } + throw new ISE("Can't find location to handle segment[%s]", segment); + } } public List<File> findPartitionFiles(String supervisorTaskId, Interval interval, int partitionId) @@ -259,7 +347,7 @@ public class IntermediaryDataManager for (StorageLocation location : shuffleDataLocations) { final File supervisorTaskPath = new File(location.getPath(), supervisorTaskId); if (supervisorTaskPath.exists()) { - log.info("Cleaning up [%s]", supervisorTaskPath); + LOG.info("Cleaning up [%s]", supervisorTaskPath); for (File eachFile : FileUtils.listFiles(supervisorTaskPath, null, true)) { location.removeFile(eachFile); } @@ -269,54 +357,6 @@ public class IntermediaryDataManager supervisorTaskCheckTimes.remove(supervisorTaskId); } - /** - * Iterate through the given storage locations to find one which can handle the given segment. - */ - public static void addSegment( - Iterator<StorageLocation> cyclicIterator, - int numLocations, - String supervisorTaskId, - String subTaskId, - DataSegment segment, - File segmentFile - ) - { - for (int i = 0; i < numLocations; i++) { - final StorageLocation location = cyclicIterator.next(); - final File destFile = location.reserve( - getPartitionFilePath( - supervisorTaskId, - subTaskId, - segment.getInterval(), - segment.getShardSpec().getPartitionNum() - ), - segment.getId(), - segmentFile.length() - ); - if (destFile != null) { - try { - FileUtils.forceMkdirParent(destFile); - final long copiedBytes = Files.asByteSource(segmentFile).copyTo(Files.asByteSink(destFile)); - if (copiedBytes == 0) { - throw new IOE( - "0 bytes copied after copying a segment file from [%s] to [%s]", - segmentFile.getAbsolutePath(), - destFile.getAbsolutePath() - ); - } else { - return; - } - } - catch (IOException e) { - // Only log here to try other locations as well. - log.warn(e, "Failed to write segmentFile at [%s]", destFile); - location.removeFile(segmentFile); - } - } - } - throw new ISE("Can't find location to handle segment[%s]", segment); - } - private static String getPartitionFilePath( String supervisorTaskId, String subTaskId, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusher.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusher.java new file mode 100644 index 0000000..fcbdf9d --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusher.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.worker; + +import org.apache.druid.segment.SegmentUtils; +import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.timeline.DataSegment; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.Map; + +/** + * DataSegmentPusher used for storing intermediary data in local storage during data shuffle of native parallel + * indexing. + */ +public class ShuffleDataSegmentPusher implements DataSegmentPusher +{ + private final String supervisorTaskId; + private final String subTaskId; + private final IntermediaryDataManager intermediaryDataManager; + + public ShuffleDataSegmentPusher( + String supervisorTaskId, + String subTaskId, + IntermediaryDataManager intermediaryDataManager + ) + { + this.supervisorTaskId = supervisorTaskId; + this.subTaskId = subTaskId; + this.intermediaryDataManager = intermediaryDataManager; + } + + @Override + public String getPathForHadoop(String dataSource) + { + throw new UnsupportedOperationException(); + } + + @Override + public String getPathForHadoop() + { + throw new UnsupportedOperationException(); + } + + @Override + public DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException + { + final long unzippedSize = intermediaryDataManager.addSegment(supervisorTaskId, subTaskId, segment, file); + return segment.withSize(unzippedSize) + .withBinaryVersion(SegmentUtils.getVersionFromDir(file)); + } + + @Override + public Map<String, Object> makeLoadSpec(URI finalIndexZipFilePath) + { + throw new UnsupportedOperationException(); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java index c158c0e..6a5f611 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java @@ -19,7 +19,6 @@ package org.apache.druid.indexing.worker; -import com.amazonaws.util.StringUtils; import com.google.common.collect.ImmutableList; import org.apache.commons.io.FileUtils; import org.apache.druid.client.indexing.IndexingServiceClient; @@ -43,6 +42,7 @@ import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -116,19 +116,20 @@ public class IntermediaryDataManagerAutoCleanupTest { final String supervisorTaskId = "supervisorTaskId"; final Interval interval = Intervals.of("2018/2019"); - final File segmentFile = generateSegmentFile(); + final File segmentFile = generateSegmentDir("test"); final DataSegment segment = newSegment(interval, 0); intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId", segment, segmentFile); - Thread.sleep(8000); + Thread.sleep(3000); Assert.assertTrue(intermediaryDataManager.findPartitionFiles(supervisorTaskId, interval, 0).isEmpty()); } - private File generateSegmentFile() throws IOException + private File generateSegmentDir(String fileName) throws IOException { - final File segmentFile = tempDir.newFile(); - FileUtils.write(segmentFile, "test data.", StringUtils.UTF8); - return segmentFile; + // Each file size is 138 bytes after compression + final File segmentDir = tempDir.newFolder(); + FileUtils.write(new File(segmentDir, fileName), "test data.", StandardCharsets.UTF_8); + return segmentDir; } private DataSegment newSegment(Interval interval, int partitionId) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java index 7bd9e90..7d322d0 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java @@ -19,7 +19,6 @@ package org.apache.druid.indexing.worker; -import com.amazonaws.util.StringUtils; import com.google.common.collect.ImmutableList; import org.apache.commons.io.FileUtils; import org.apache.druid.client.indexing.IndexingServiceClient; @@ -41,6 +40,7 @@ import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Comparator; import java.util.List; @@ -67,7 +67,7 @@ public class IntermediaryDataManagerManualAddAndDeleteTest false, null, null, - ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), 150L, null)) + ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), 600L, null)) ); final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient(); intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient); @@ -83,15 +83,16 @@ public class IntermediaryDataManagerManualAddAndDeleteTest @Test public void testAddSegmentFailure() throws IOException { - for (int i = 0; i < 15; i++) { - File segmentFile = generateSegmentFile(); + int i = 0; + for (; i < 4; i++) { + File segmentFile = generateSegmentDir("file_" + i); DataSegment segment = newSegment(Intervals.of("2018/2019"), i); intermediaryDataManager.addSegment("supervisorTaskId", "subTaskId", segment, segmentFile); } expectedException.expect(IllegalStateException.class); expectedException.expectMessage("Can't find location to handle segment"); - File segmentFile = generateSegmentFile(); - DataSegment segment = newSegment(Intervals.of("2018/2019"), 16); + File segmentFile = generateSegmentDir("file_" + i); + DataSegment segment = newSegment(Intervals.of("2018/2019"), 4); intermediaryDataManager.addSegment("supervisorTaskId", "subTaskId", segment, segmentFile); } @@ -101,15 +102,15 @@ public class IntermediaryDataManagerManualAddAndDeleteTest final String supervisorTaskId = "supervisorTaskId"; final Interval interval = Intervals.of("2018/2019"); final int partitionId = 0; - for (int i = 0; i < 10; i++) { - final File segmentFile = generateSegmentFile(); + for (int i = 0; i < 4; i++) { + final File segmentFile = generateSegmentDir("file_" + i); final DataSegment segment = newSegment(interval, partitionId); intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId_" + i, segment, segmentFile); } final List<File> files = intermediaryDataManager.findPartitionFiles(supervisorTaskId, interval, partitionId); - Assert.assertEquals(10, files.size()); + Assert.assertEquals(4, files.size()); files.sort(Comparator.comparing(File::getName)); - for (int i = 0; i < 10; i++) { + for (int i = 0; i < 4; i++) { Assert.assertEquals("subTaskId_" + i, files.get(i).getName()); } } @@ -119,9 +120,9 @@ public class IntermediaryDataManagerManualAddAndDeleteTest { final String supervisorTaskId = "supervisorTaskId"; final Interval interval = Intervals.of("2018/2019"); - for (int partitionId = 0; partitionId < 5; partitionId++) { - for (int subTaskId = 0; subTaskId < 3; subTaskId++) { - final File segmentFile = generateSegmentFile(); + for (int partitionId = 0; partitionId < 2; partitionId++) { + for (int subTaskId = 0; subTaskId < 2; subTaskId++) { + final File segmentFile = generateSegmentDir("file_" + partitionId + "_" + subTaskId); final DataSegment segment = newSegment(interval, partitionId); intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId_" + subTaskId, segment, segmentFile); } @@ -129,7 +130,7 @@ public class IntermediaryDataManagerManualAddAndDeleteTest intermediaryDataManager.deletePartitions(supervisorTaskId); - for (int partitionId = 0; partitionId < 5; partitionId++) { + for (int partitionId = 0; partitionId < 2; partitionId++) { Assert.assertTrue(intermediaryDataManager.findPartitionFiles(supervisorTaskId, interval, partitionId).isEmpty()); } } @@ -139,22 +140,24 @@ public class IntermediaryDataManagerManualAddAndDeleteTest { final String supervisorTaskId = "supervisorTaskId"; final Interval interval = Intervals.of("2018/2019"); - for (int i = 0; i < 15; i++) { - File segmentFile = generateSegmentFile(); + int i = 0; + for (; i < 4; i++) { + File segmentFile = generateSegmentDir("file_" + i); DataSegment segment = newSegment(interval, i); intermediaryDataManager.addSegment("supervisorTaskId", "subTaskId", segment, segmentFile); } intermediaryDataManager.deletePartitions(supervisorTaskId); - File segmentFile = generateSegmentFile(); - DataSegment segment = newSegment(interval, 16); + File segmentFile = generateSegmentDir("file_" + i); + DataSegment segment = newSegment(interval, i); intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId", segment, segmentFile); } - private File generateSegmentFile() throws IOException + private File generateSegmentDir(String fileName) throws IOException { - final File segmentFile = tempDir.newFile(); - FileUtils.write(segmentFile, "test data.", StringUtils.UTF8); - return segmentFile; + // Each file size is 138 bytes after compression + final File segmentDir = tempDir.newFolder(); + FileUtils.write(new File(segmentDir, fileName), "test data.", StandardCharsets.UTF_8); + return segmentDir; } private DataSegment newSegment(Interval interval, int partitionId) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusherTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusherTest.java new file mode 100644 index 0000000..c3dfcdd --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusherTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.worker; + +import com.google.common.collect.ImmutableList; +import com.google.common.io.Files; +import com.google.common.primitives.Ints; +import org.apache.commons.io.FileUtils; +import org.apache.druid.client.indexing.IndexingServiceClient; +import org.apache.druid.client.indexing.NoopIndexingServiceClient; +import org.apache.druid.indexing.common.config.TaskConfig; +import org.apache.druid.indexing.worker.config.WorkerConfig; +import org.apache.druid.java.util.common.FileUtils.FileCopyResult; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.segment.loading.StorageLocationConfig; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.apache.druid.utils.CompressionUtils; +import org.joda.time.Interval; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +public class ShuffleDataSegmentPusherTest +{ + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private IntermediaryDataManager intermediaryDataManager; + private ShuffleDataSegmentPusher segmentPusher; + + @Before + public void setup() throws IOException + { + final WorkerConfig workerConfig = new WorkerConfig(); + final TaskConfig taskConfig = new TaskConfig( + null, + null, + null, + null, + null, + false, + null, + null, + ImmutableList.of(new StorageLocationConfig(temporaryFolder.newFolder(), null, null)) + ); + final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient(); + intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient); + intermediaryDataManager.start(); + segmentPusher = new ShuffleDataSegmentPusher("supervisorTaskId", "subTaskId", intermediaryDataManager); + } + + @After + public void teardown() throws InterruptedException + { + intermediaryDataManager.stop(); + } + + @Test + public void testPush() throws IOException + { + final File segmentDir = generateSegmentDir(); + final DataSegment segment = newSegment(Intervals.of("2018/2019")); + final DataSegment pushed = segmentPusher.push(segmentDir, segment, true); + + Assert.assertEquals(9, pushed.getBinaryVersion().intValue()); + Assert.assertEquals(14, pushed.getSize()); // 10 bytes data + 4 bytes version + + final List<File> files = intermediaryDataManager.findPartitionFiles( + "supervisorTaskId", + segment.getInterval(), + segment.getShardSpec().getPartitionNum() + ); + Assert.assertEquals(1, files.size()); + final File zippedSegment = files.get(0); + final File tempDir = temporaryFolder.newFolder(); + final FileCopyResult result = CompressionUtils.unzip(zippedSegment, tempDir); + final List<File> unzippedFiles = new ArrayList<>(result.getFiles()); + unzippedFiles.sort(Comparator.comparing(File::getName)); + final File dataFile = unzippedFiles.get(0); + Assert.assertEquals("test", dataFile.getName()); + Assert.assertEquals("test data.", Files.readFirstLine(dataFile, StandardCharsets.UTF_8)); + final File versionFile = unzippedFiles.get(1); + Assert.assertEquals("version.bin", versionFile.getName()); + Assert.assertArrayEquals(Ints.toByteArray(0x9), Files.toByteArray(versionFile)); + } + + private File generateSegmentDir() throws IOException + { + // Each file size is 138 bytes after compression + final File segmentDir = temporaryFolder.newFolder(); + Files.asByteSink(new File(segmentDir, "version.bin")).write(Ints.toByteArray(0x9)); + FileUtils.write(new File(segmentDir, "test"), "test data.", StandardCharsets.UTF_8); + return segmentDir; + } + + private DataSegment newSegment(Interval interval) + { + return new DataSegment( + "dataSource", + interval, + "version", + null, + null, + null, + new NumberedShardSpec(0, 0), + 9, + 0 + ); + } +} diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java index 842295e..6ed86a3 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java +++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java @@ -23,7 +23,6 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.commons.io.FileUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.SegmentId; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; @@ -115,7 +114,7 @@ public class StorageLocation @Nullable public synchronized File reserve(String segmentDir, DataSegment segment) { - return reserve(segmentDir, segment.getId(), segment.getSize()); + return reserve(segmentDir, segment.getId().toString(), segment.getSize()); } /** @@ -124,7 +123,7 @@ public class StorageLocation * Returns null otherwise. */ @Nullable - public synchronized File reserve(String segmentFilePathToAdd, SegmentId segmentId, long segmentSize) + public synchronized File reserve(String segmentFilePathToAdd, String segmentId, long segmentSize) { final File segmentFileToAdd = new File(path, segmentFilePathToAdd); if (files.contains(segmentFileToAdd)) { @@ -145,7 +144,7 @@ public class StorageLocation */ @VisibleForTesting @GuardedBy("this") - boolean canHandle(SegmentId segmentId, long segmentSize) + boolean canHandle(String segmentId, long segmentSize) { if (availableSizeBytes() < segmentSize) { log.warn( diff --git a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java index cb08217..f617261 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerConcurrencyTest.java @@ -153,8 +153,6 @@ public class SegmentLoaderLocalCacheManagerConcurrencyTest for (Future future : futures) { future.get(); } - - System.out.println(manager.getLocations().get(0).availableSizeBytes()); } private DataSegment newSegment(Interval interval, int partitionId) diff --git a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java index 23da422..7be5328 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java @@ -39,18 +39,18 @@ public class StorageLocationTest { // free space ignored only maxSize matters StorageLocation locationPlain = fakeLocation(100_000, 5_000, 10_000, null); - Assert.assertTrue(locationPlain.canHandle(newSegmentId("2012/2013"), 9_000)); - Assert.assertFalse(locationPlain.canHandle(newSegmentId("2012/2013"), 11_000)); + Assert.assertTrue(locationPlain.canHandle(newSegmentId("2012/2013").toString(), 9_000)); + Assert.assertFalse(locationPlain.canHandle(newSegmentId("2012/2013").toString(), 11_000)); // enough space available maxSize is the limit StorageLocation locationFree = fakeLocation(100_000, 25_000, 10_000, 10.0); - Assert.assertTrue(locationFree.canHandle(newSegmentId("2012/2013"), 9_000)); - Assert.assertFalse(locationFree.canHandle(newSegmentId("2012/2013"), 11_000)); + Assert.assertTrue(locationFree.canHandle(newSegmentId("2012/2013").toString(), 9_000)); + Assert.assertFalse(locationFree.canHandle(newSegmentId("2012/2013").toString(), 11_000)); // disk almost full percentage is the limit StorageLocation locationFull = fakeLocation(100_000, 15_000, 10_000, 10.0); - Assert.assertTrue(locationFull.canHandle(newSegmentId("2012/2013"), 4_000)); - Assert.assertFalse(locationFull.canHandle(newSegmentId("2012/2013"), 6_000)); + Assert.assertTrue(locationFull.canHandle(newSegmentId("2012/2013").toString(), 4_000)); + Assert.assertFalse(locationFull.canHandle(newSegmentId("2012/2013").toString(), 6_000)); } private StorageLocation fakeLocation(long total, long free, long max, Double percent) @@ -99,7 +99,7 @@ public class StorageLocationTest { Assert.assertEquals(maxSize, loc.availableSizeBytes()); for (int i = 0; i <= maxSize; ++i) { - Assert.assertTrue(String.valueOf(i), loc.canHandle(newSegmentId("2013/2014"), i)); + Assert.assertTrue(String.valueOf(i), loc.canHandle(newSegmentId("2013/2014").toString(), i)); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org