This is an automated email from the ASF dual-hosted git repository. fjy pushed a commit to branch 0.18.0 in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/0.18.0 by this push: new b46d06a check paths used for shuffle intermediary data manager get and delete (#9630) (#9640) b46d06a is described below commit b46d06abc3cc07909b9a52ecb18074937dbcf449 Author: Clint Wylie <cwy...@apache.org> AuthorDate: Tue Apr 7 14:33:46 2020 -0700 check paths used for shuffle intermediary data manager get and delete (#9630) (#9640) * check paths used for shuffle intermediary data manager get and delete * add test * newline * meh --- .../java/org/apache/druid/indexer/TaskIdUtils.java | 63 ++++++++++++ .../org/apache/druid/indexer/TaskIdUtilsTest.java | 111 +++++++++++++++++++++ .../druid/indexing/kafka/KafkaConsumerConfigs.java | 4 +- .../indexing/kafka/supervisor/KafkaSupervisor.java | 4 +- .../kinesis/supervisor/KinesisSupervisor.java | 4 +- .../druid/indexing/common/task/AbstractTask.java | 4 +- .../indexing/common/task/utils/RandomIdUtils.java | 34 ------- .../supervisor/SeekableStreamSupervisor.java | 6 +- .../indexing/worker/IntermediaryDataManager.java | 3 + ...ermediaryDataManagerManualAddAndDeleteTest.java | 64 +++++++++++- .../tests/indexer/AbstractKafkaIndexerTest.java | 4 +- server/pom.xml | 6 ++ .../apache/druid/segment/indexing/DataSchema.java | 14 +-- .../druid/segment/indexing/DataSchemaTest.java | 66 ++++++++++-- 14 files changed, 316 insertions(+), 71 deletions(-) diff --git a/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java b/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java new file mode 100644 index 0000000..a88341b --- /dev/null +++ b/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import org.apache.druid.java.util.common.StringUtils; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class TaskIdUtils +{ + private static final Pattern INVALIDCHARS = Pattern.compile("(?s).*[^\\S ].*"); + + public static void validateId(String thingToValidate, String stringToValidate) + { + Preconditions.checkArgument( + !Strings.isNullOrEmpty(stringToValidate), + StringUtils.format("%s cannot be null or empty. Please provide a %s.", thingToValidate, thingToValidate) + ); + Preconditions.checkArgument( + !stringToValidate.startsWith("."), + StringUtils.format("%s cannot start with the '.' character.", thingToValidate) + ); + Preconditions.checkArgument( + !stringToValidate.contains("/"), + StringUtils.format("%s cannot contain the '/' character.", thingToValidate) + ); + Matcher m = INVALIDCHARS.matcher(stringToValidate); + Preconditions.checkArgument( + !m.matches(), + StringUtils.format("%s cannot contain whitespace character except space.", thingToValidate) + ); + } + + public static String getRandomId() + { + final StringBuilder suffix = new StringBuilder(8); + for (int i = 0; i < Integer.BYTES * 2; ++i) { + suffix.append((char) ('a' + ((ThreadLocalRandom.current().nextInt() >>> (i * 4)) & 0x0F))); + } + return suffix.toString(); + } +} diff --git a/core/src/test/java/org/apache/druid/indexer/TaskIdUtilsTest.java b/core/src/test/java/org/apache/druid/indexer/TaskIdUtilsTest.java new file mode 100644 index 0000000..5fed8fb --- /dev/null +++ b/core/src/test/java/org/apache/druid/indexer/TaskIdUtilsTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class TaskIdUtilsTest +{ + private static final String THINGO = "thingToValidate"; + public static final String VALID_ID_CHARS = "alpha123..*~!@#&%^&*()-+ Россия\\ 한국 中国!"; + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testValidIdName() + { + TaskIdUtils.validateId(THINGO, VALID_ID_CHARS); + } + + @Test + public void testInvalidNull() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("thingToValidate cannot be null or empty. Please provide a thingToValidate."); + TaskIdUtils.validateId(THINGO, null); + } + + @Test + public void testInvalidEmpty() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("thingToValidate cannot be null or empty. Please provide a thingToValidate."); + TaskIdUtils.validateId(THINGO, ""); + } + + @Test + public void testInvalidSlashes() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("thingToValidate cannot contain the '/' character."); + TaskIdUtils.validateId(THINGO, "/paths/are/bad/since/we/make/files/from/stuff"); + } + + @Test + public void testInvalidLeadingDot() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("thingToValidate cannot start with the '.' character."); + TaskIdUtils.validateId(THINGO, "./nice/try"); + } + + @Test + public void testInvalidSpacesRegexTabs() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("thingToValidate cannot contain whitespace character except space."); + TaskIdUtils.validateId(THINGO, "spaces\tare\tbetter\tthan\ttabs\twhich\tare\tillegal"); + } + + @Test + public void testInvalidSpacesRegexNewline() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("thingToValidate cannot contain whitespace character except space."); + TaskIdUtils.validateId(THINGO, "new\nline"); + } + + @Test + public void testInvalidSpacesRegexCarriageReturn() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("thingToValidate cannot contain whitespace character except space."); + TaskIdUtils.validateId(THINGO, "does\rexist\rby\ritself"); + } + + @Test + public void testInvalidSpacesRegexLineTabulation() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("thingToValidate cannot contain whitespace character except space."); + TaskIdUtils.validateId(THINGO, "wtf\u000Bis line tabulation"); + } + + @Test + public void testInvalidSpacesRegexFormFeed() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("thingToValidate cannot contain whitespace character except space."); + TaskIdUtils.validateId(THINGO, "form\u000cfeed?"); + } +} diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java index 39174d5..7339d26 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java @@ -19,7 +19,7 @@ package org.apache.druid.indexing.kafka; -import org.apache.druid.indexing.common.task.utils.RandomIdUtils; +import org.apache.druid.indexer.TaskIdUtils; import org.apache.druid.java.util.common.StringUtils; import java.util.HashMap; @@ -35,7 +35,7 @@ public class KafkaConsumerConfigs { final Map<String, Object> props = new HashMap<>(); props.put("metadata.max.age.ms", "10000"); - props.put("group.id", StringUtils.format("kafka-supervisor-%s", RandomIdUtils.getRandomId())); + props.put("group.id", StringUtils.format("kafka-supervisor-%s", TaskIdUtils.getRandomId())); props.put("auto.offset.reset", "none"); props.put("enable.auto.commit", "false"); props.put("isolation.level", "read_committed"); diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index f5cec3d..ba12128 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -24,10 +24,10 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; +import org.apache.druid.indexer.TaskIdUtils; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.TaskResource; -import org.apache.druid.indexing.common.task.utils.RandomIdUtils; import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata; import org.apache.druid.indexing.kafka.KafkaIndexTask; import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory; @@ -221,7 +221,7 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long> List<SeekableStreamIndexTask<Integer, Long>> taskList = new ArrayList<>(); for (int i = 0; i < replicas; i++) { - String taskId = Joiner.on("_").join(baseSequenceName, RandomIdUtils.getRandomId()); + String taskId = Joiner.on("_").join(baseSequenceName, TaskIdUtils.getRandomId()); taskList.add(new KafkaIndexTask( taskId, new TaskResource(baseSequenceName, 1), diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index 13f94a5..c789fc7 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -25,10 +25,10 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; import org.apache.druid.common.aws.AWSCredentialsConfig; +import org.apache.druid.indexer.TaskIdUtils; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.TaskResource; -import org.apache.druid.indexing.common.task.utils.RandomIdUtils; import org.apache.druid.indexing.kinesis.KinesisDataSourceMetadata; import org.apache.druid.indexing.kinesis.KinesisIndexTask; import org.apache.druid.indexing.kinesis.KinesisIndexTaskClientFactory; @@ -169,7 +169,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String> List<SeekableStreamIndexTask<String, String>> taskList = new ArrayList<>(); for (int i = 0; i < replicas; i++) { - String taskId = Joiner.on("_").join(baseSequenceName, RandomIdUtils.getRandomId()); + String taskId = Joiner.on("_").join(baseSequenceName, TaskIdUtils.getRandomId()); taskList.add(new KinesisIndexTask( taskId, new TaskResource(baseSequenceName, 1), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java index d47f900..40745bf 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java @@ -24,11 +24,11 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Joiner; import com.google.common.base.Objects; import com.google.common.base.Preconditions; +import org.apache.druid.indexer.TaskIdUtils; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.TaskActionClient; -import org.apache.druid.indexing.common.task.utils.RandomIdUtils; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; @@ -92,7 +92,7 @@ public abstract class AbstractTask implements Task } final List<Object> objects = new ArrayList<>(); - final String suffix = RandomIdUtils.getRandomId(); + final String suffix = TaskIdUtils.getRandomId(); objects.add(typeName); objects.add(dataSource); objects.add(suffix); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/utils/RandomIdUtils.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/utils/RandomIdUtils.java deleted file mode 100644 index a782b66..0000000 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/utils/RandomIdUtils.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.common.task.utils; - -import java.util.concurrent.ThreadLocalRandom; - -public class RandomIdUtils -{ - public static String getRandomId() - { - final StringBuilder suffix = new StringBuilder(8); - for (int i = 0; i < Integer.BYTES * 2; ++i) { - suffix.append((char) ('a' + ((ThreadLocalRandom.current().nextInt() >>> (i * 4)) & 0x0F))); - } - return suffix.toString(); - } -} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 948b687..682a560 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -430,13 +430,13 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy } } - // Map<{group RandomIdUtils}, {actively reading task group}>; see documentation for TaskGroup class + // Map<{group id}, {actively reading task group}>; see documentation for TaskGroup class private final ConcurrentHashMap<Integer, TaskGroup> activelyReadingTaskGroups = new ConcurrentHashMap<>(); // After telling a taskGroup to stop reading and begin publishing a segment, it is moved from [activelyReadingTaskGroups] to here so // we can monitor its status while we queue new tasks to read the next range of sequences. This is a list since we could // have multiple sets of tasks publishing at once if time-to-publish > taskDuration. - // Map<{group RandomIdUtils}, List<{pending completion task groups}>> + // Map<{group id}, List<{pending completion task groups}>> private final ConcurrentHashMap<Integer, CopyOnWriteArrayList<TaskGroup>> pendingCompletionTaskGroups = new ConcurrentHashMap<>(); // We keep two separate maps for tracking the current state of partition->task group mappings [partitionGroups] and partition->offset @@ -998,7 +998,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy group.tasks.putAll(tasks.stream().collect(Collectors.toMap(x -> x, x -> new TaskData()))); if (activelyReadingTaskGroups.putIfAbsent(taskGroupId, group) != null) { throw new ISE( - "trying to add taskGroup with RandomIdUtils [%s] to actively reading task groups, but group already exists.", + "trying to add taskGroup with id [%s] to actively reading task groups, but group already exists.", taskGroupId ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java index b35b191..78090ca 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java @@ -27,6 +27,7 @@ import org.apache.commons.lang3.mutable.MutableInt; import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.client.indexing.TaskStatus; import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.indexer.TaskIdUtils; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.DateTimes; @@ -336,6 +337,7 @@ public class IntermediaryDataManager @Nullable public File findPartitionFile(String supervisorTaskId, String subTaskId, Interval interval, int partitionId) { + TaskIdUtils.validateId("supervisorTaskId", supervisorTaskId); for (StorageLocation location : shuffleDataLocations) { final File partitionDir = new File(location.getPath(), getPartitionDir(supervisorTaskId, interval, partitionId)); if (partitionDir.exists()) { @@ -364,6 +366,7 @@ public class IntermediaryDataManager public void deletePartitions(String supervisorTaskId) throws IOException { + TaskIdUtils.validateId("supervisorTaskId", supervisorTaskId); for (StorageLocation location : shuffleDataLocations) { final File supervisorTaskPath = new File(location.getPath(), supervisorTaskId); if (supervisorTaskPath.exists()) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java index 1e1eab4..15aad92 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java @@ -26,6 +26,7 @@ import org.apache.druid.client.indexing.NoopIndexingServiceClient; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NumberedShardSpec; @@ -51,11 +52,15 @@ public class IntermediaryDataManagerManualAddAndDeleteTest public ExpectedException expectedException = ExpectedException.none(); private IntermediaryDataManager intermediaryDataManager; + private File intermediarySegmentsLocation; + private File siblingLocation; @Before public void setup() throws IOException { final WorkerConfig workerConfig = new WorkerConfig(); + intermediarySegmentsLocation = tempDir.newFolder(); + siblingLocation = tempDir.newFolder(); final TaskConfig taskConfig = new TaskConfig( null, null, @@ -65,7 +70,7 @@ public class IntermediaryDataManagerManualAddAndDeleteTest false, null, null, - ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), 600L, null)) + ImmutableList.of(new StorageLocationConfig(intermediarySegmentsLocation, 600L, null)) ); final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient(); intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient); @@ -157,6 +162,63 @@ public class IntermediaryDataManagerManualAddAndDeleteTest intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId", segment, segmentFile); } + @Test + public void testFailsWithCraftyFabricatedNamesForDelete() throws IOException + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("supervisorTaskId cannot start with the '.' character."); + final String supervisorTaskId = "../" + siblingLocation.getName(); + final String someFile = "sneaky-snake.txt"; + File dataFile = new File(siblingLocation, someFile); + FileUtils.write( + dataFile, + "test data", + StandardCharsets.UTF_8 + ); + Assert.assertTrue(new File(intermediarySegmentsLocation, supervisorTaskId).exists()); + Assert.assertTrue(dataFile.exists()); + intermediaryDataManager.deletePartitions(supervisorTaskId); + Assert.assertTrue(new File(intermediarySegmentsLocation, supervisorTaskId).exists()); + Assert.assertTrue(dataFile.exists()); + } + + @Test + public void testFailsWithCraftyFabricatedNamesForFind() throws IOException + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("supervisorTaskId cannot start with the '.' character."); + final String supervisorTaskId = "../" + siblingLocation.getName(); + final Interval interval = Intervals.of("2018/2019"); + final int partitionId = 0; + final String intervalAndPart = + StringUtils.format("%s/%s/%s", interval.getStart().toString(), interval.getEnd().toString(), partitionId); + + final String someFile = "sneaky-snake.txt"; + + final String someFilePath = intervalAndPart + "/" + someFile; + + // can only traverse to find files that are in a path of the form {start}/{end}/{partitionId}, so write a data file + // in a location like that + File dataFile = new File(siblingLocation, someFilePath); + FileUtils.write( + dataFile, + "test data", + StandardCharsets.UTF_8 + ); + + Assert.assertTrue(new File(intermediarySegmentsLocation, supervisorTaskId).exists()); + Assert.assertTrue( + new File(intermediarySegmentsLocation, supervisorTaskId + "/" + someFilePath).exists()); + + final File foundFile1 = intermediaryDataManager.findPartitionFile( + supervisorTaskId, + someFile, + interval, + partitionId + ); + Assert.assertNull(foundFile1); + } + private File generateSegmentDir(String fileName) throws IOException { // Each file size is 138 bytes after compression diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java index 87a2cce..7ea8bd4 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java @@ -28,7 +28,7 @@ import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import org.apache.commons.io.IOUtils; -import org.apache.druid.indexing.common.task.utils.RandomIdUtils; +import org.apache.druid.indexer.TaskIdUtils; import org.apache.druid.indexing.kafka.KafkaConsumerConfigs; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; @@ -161,7 +161,7 @@ abstract class AbstractKafkaIndexerTest extends AbstractIndexerTest properties.setProperty("value.serializer", ByteArraySerializer.class.getName()); if (txnEnabled) { properties.setProperty("enable.idempotence", "true"); - properties.setProperty("transactional.id", RandomIdUtils.getRandomId()); + properties.setProperty("transactional.id", TaskIdUtils.getRandomId()); } KafkaProducer<String, String> producer = new KafkaProducer<>( diff --git a/server/pom.xml b/server/pom.xml index 7bfa266..b2bab34 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -430,6 +430,12 @@ <artifactId>equalsverifier</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-text</artifactId> + <version>1.3</version> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java index 37009da..6be9a71 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java @@ -27,12 +27,12 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.base.Strings; import com.google.common.collect.Sets; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexer.TaskIdUtils; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -45,7 +45,6 @@ import java.util.Arrays; import java.util.HashSet; import java.util.Map; import java.util.Set; -import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -148,16 +147,7 @@ public class DataSchema private static void validateDatasourceName(String dataSource) { - Preconditions.checkArgument( - !Strings.isNullOrEmpty(dataSource), - "dataSource cannot be null or empty. Please provide a dataSource." - ); - Matcher m = INVALIDCHARS.matcher(dataSource); - Preconditions.checkArgument( - !m.matches(), - "dataSource cannot contain whitespace character except space." - ); - Preconditions.checkArgument(!dataSource.contains("/"), "dataSource cannot contain the '/' character."); + TaskIdUtils.validateId("dataSource", dataSource); } private static DimensionsSpec computeDimensionsSpec( diff --git a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java index de674ae..8ce50d3 100644 --- a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java +++ b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java @@ -21,14 +21,17 @@ package org.apache.druid.segment.indexing; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.exc.ValueInstantiationException; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import org.apache.commons.text.StringEscapeUtils; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.JSONParseSpec; import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexer.TaskIdUtilsTest; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Intervals; @@ -43,6 +46,7 @@ import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec; import org.apache.druid.segment.transform.ExpressionTransform; import org.apache.druid.segment.transform.TransformSpec; +import org.apache.druid.testing.InitializedNullHandlingTest; import org.hamcrest.CoreMatchers; import org.junit.Assert; import org.junit.Rule; @@ -54,9 +58,10 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.Map; -public class DataSchemaTest +public class DataSchemaTest extends InitializedNullHandlingTest { @Rule public ExpectedException expectedException = ExpectedException.none(); @@ -79,7 +84,7 @@ public class DataSchemaTest ); DataSchema schema = new DataSchema( - "test", + TaskIdUtilsTest.VALID_ID_CHARS, parser, new AggregatorFactory[]{ new DoubleSumAggregatorFactory("metric1", "col1"), @@ -116,7 +121,7 @@ public class DataSchemaTest ); DataSchema schema = new DataSchema( - "test", + TaskIdUtilsTest.VALID_ID_CHARS, parser, new AggregatorFactory[]{ new DoubleSumAggregatorFactory("metric1", "col1"), @@ -153,7 +158,7 @@ public class DataSchemaTest ); DataSchema schema = new DataSchema( - "test", + TaskIdUtilsTest.VALID_ID_CHARS, parserMap, new AggregatorFactory[]{ new DoubleSumAggregatorFactory("metric1", "col1"), @@ -211,7 +216,7 @@ public class DataSchemaTest ); DataSchema schema = new DataSchema( - "test", + TaskIdUtilsTest.VALID_ID_CHARS, parser, new AggregatorFactory[]{ new DoubleSumAggregatorFactory("metric1", "col1"), @@ -244,7 +249,7 @@ public class DataSchemaTest ); DataSchema schema = new DataSchema( - "test", + TaskIdUtilsTest.VALID_ID_CHARS, parser, new AggregatorFactory[]{ new DoubleSumAggregatorFactory("metric1", "col1"), @@ -262,7 +267,7 @@ public class DataSchemaTest public void testSerdeWithInvalidParserMap() throws Exception { String jsonStr = "{" - + "\"dataSource\":\"test\"," + + "\"dataSource\":\"" + StringEscapeUtils.escapeJson(TaskIdUtilsTest.VALID_ID_CHARS) + "\"," + "\"parser\":{\"type\":\"invalid\"}," + "\"metricsSpec\":[{\"type\":\"doubleSum\",\"name\":\"metric1\",\"fieldName\":\"col1\"}]," + "\"granularitySpec\":{" @@ -365,7 +370,7 @@ public class DataSchemaTest public void testSerde() throws Exception { String jsonStr = "{" - + "\"dataSource\":\"test\"," + + "\"dataSource\":\"" + StringEscapeUtils.escapeJson(TaskIdUtilsTest.VALID_ID_CHARS) + "\"," + "\"parser\":{" + "\"type\":\"string\"," + "\"parseSpec\":{" @@ -389,7 +394,7 @@ public class DataSchemaTest DataSchema.class ); - Assert.assertEquals(actual.getDataSource(), "test"); + Assert.assertEquals(actual.getDataSource(), TaskIdUtilsTest.VALID_ID_CHARS); Assert.assertEquals( actual.getParser().getParseSpec(), new JSONParseSpec( @@ -415,6 +420,45 @@ public class DataSchemaTest } @Test + public void testSerializeWithInvalidDataSourceName() throws Exception + { + // Escape backslashes to insert a tab character in the datasource name. + List<String> datasources = ImmutableList.of("", "../invalid", "\tname", "name\t invalid"); + for (String datasource : datasources) { + String jsonStr = "{" + + "\"dataSource\":\"" + StringEscapeUtils.escapeJson(datasource) + "\"," + + "\"parser\":{" + + "\"type\":\"string\"," + + "\"parseSpec\":{" + + "\"format\":\"json\"," + + "\"timestampSpec\":{\"column\":\"xXx\", \"format\": \"auto\", \"missingValue\": null}," + + "\"dimensionsSpec\":{\"dimensions\":[], \"dimensionExclusions\":[]}," + + "\"flattenSpec\":{\"useFieldDiscovery\":true, \"fields\":[]}," + + "\"featureSpec\":{}}," + + "\"encoding\":\"UTF-8\"" + + "}," + + "\"metricsSpec\":[{\"type\":\"doubleSum\",\"name\":\"metric1\",\"fieldName\":\"col1\"}]," + + "\"granularitySpec\":{" + + "\"type\":\"arbitrary\"," + + "\"queryGranularity\":{\"type\":\"duration\",\"duration\":86400000,\"origin\":\"1970-01-01T00:00:00.000Z\"}," + + "\"intervals\":[\"2014-01-01T00:00:00.000Z/2015-01-01T00:00:00.000Z\"]}}"; + try { + jsonMapper.readValue( + jsonMapper.writeValueAsString( + jsonMapper.readValue(jsonStr, DataSchema.class) + ), + DataSchema.class + ); + } + catch (ValueInstantiationException e) { + Assert.assertEquals(IllegalArgumentException.class, e.getCause().getClass()); + continue; + } + Assert.fail("Serialization of datasource " + datasource + " should have failed."); + } + } + + @Test public void testSerdeWithUpdatedDataSchemaAddedField() throws IOException { Map<String, Object> parser = jsonMapper.convertValue( @@ -430,7 +474,7 @@ public class DataSchemaTest ); DataSchema originalSchema = new DataSchema( - "test", + TaskIdUtilsTest.VALID_ID_CHARS, parser, new AggregatorFactory[]{ new DoubleSumAggregatorFactory("metric1", "col1"), @@ -469,7 +513,7 @@ public class DataSchemaTest ); TestModifiedDataSchema originalSchema = new TestModifiedDataSchema( - "test", + TaskIdUtilsTest.VALID_ID_CHARS, null, null, new AggregatorFactory[]{ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org