This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 49cedf95c5b Revert "YARN-11916. FileSystemTimelineReaderImpl
vulnerable to race conditions (#8164)"
49cedf95c5b is described below
commit 49cedf95c5baab457d1cf058b9320f024c597b2f
Author: Steve Loughran <[email protected]>
AuthorDate: Wed Jan 7 15:24:51 2026 +0000
Revert "YARN-11916. FileSystemTimelineReaderImpl vulnerable to race
conditions (#8164)"
This reverts commit 11e58a4f01534ff356e2a0d524747e2db2a19305.
---
.../hadoop-yarn/hadoop-yarn-client/pom.xml | 6 -
.../storage/FileSystemTimelineReaderImpl.java | 10 +-
.../storage/FileSystemTimelineWriterImpl.java | 101 ++--------
.../timelineservice/storage/TimelineWriter.java | 4 +-
.../storage/TestFileSystemTimelineWriterImpl.java | 205 +++++++++------------
5 files changed, 97 insertions(+), 229 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
index fef007f9b35..606dd18b06a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
@@ -163,12 +163,6 @@
<scope>test</scope>
<type>test-jar</type>
</dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-server-timelineservice</artifactId>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
similarity index 97%
rename from
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
rename to
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
index d9bee2db93f..2e771fc77e8 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
@@ -39,8 +39,6 @@
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -63,16 +61,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static
org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl.escape;
-
/**
* File System based implementation for TimelineReader. This implementation
may
* not provide a complete implementation of all the necessary features. This
* implementation is provided solely for basic testing purposes, and should
not
* be used in a non-test situation.
*/
[email protected]
[email protected]
public class FileSystemTimelineReaderImpl extends AbstractService
implements TimelineReader {
@@ -170,9 +164,7 @@ private static void fillFields(TimelineEntity finalEntity,
private String getFlowRunPath(String userId, String clusterId,
String flowName, Long flowRunId, String appId) throws IOException {
if (userId != null && flowName != null && flowRunId != null) {
- return escape(userId) + File.separator
- + escape(flowName) + File.separator
- + "*" + File.separator + flowRunId;
+ return userId + File.separator + flowName + File.separator + "*" +
File.separator + flowRunId;
}
if (clusterId == null || appId == null) {
throw new IOException("Unable to get flow info");
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
similarity index 75%
rename from
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
rename to
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
index 3ecdea4e2f0..2f7007165a0 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
@@ -29,7 +29,6 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.store.LogExactlyOnce;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
@@ -52,27 +51,7 @@
* This implements a FileSystem based backend for storing application timeline
* information. This implementation may not provide a complete implementation
of
* all the necessary features. This implementation is provided solely for basic
- * testing purposes, and MUST NOT be used in a non-test situation.
- * <p>
- * Key limitations are:
- * <ol>
- * <li>Inadequate scalability and concurrency for production use</li>
- * <li>Weak security: any authenticated caller can add events to any
application
- * timeline.</li>
- * </ol>
- * <p>
- * To implement an atomic append it reads all the data in the original file,
- * writes that to a temporary file, appends the new
- * data there and renames that temporary file to the original path.
- * This makes the update operation slower and slower the longer an application
runs.
- * If any other update comes in while an existing update is in progress,
- * it will read and append to the previous state of the log, losing all changes
- * from the ongoing transaction.
- * <p>
- * This is not a database. Apache HBase is. Use it.
- * <p>
- * The only realistic justification for this is if you are writing code which
updates
- * the timeline service and you want something easier to debug in unit tests.
+ * testing purposes, and should not be used in a non-test situation.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
@@ -110,16 +89,8 @@ public class FileSystemTimelineWriterImpl extends
AbstractService
private static final Logger LOG =
LoggerFactory.getLogger(FileSystemTimelineWriter.class);
- private static final Logger LOGIMPL =
- LoggerFactory.getLogger(FileSystemTimelineWriterImpl.class);
-
- public static final LogExactlyOnce WARNING_OF_USE =
- new LogExactlyOnce(LOGIMPL);
-
FileSystemTimelineWriterImpl() {
super((FileSystemTimelineWriterImpl.class.getName()));
- WARNING_OF_USE.warn("This timeline writer is neither safe nor scaleable
enough to"
- + " be used in production.");
}
@Override
@@ -155,19 +126,21 @@ private synchronized void writeInternal(String clusterId,
String userId,
TimelineEntity entity,
TimelineWriteResponse response)
throws IOException {
- final String entityTypePathStr =
- buildEntityTypeSubpath(clusterId, userId, flowName, flowVersion,
flowRun, appId, entity.getType());
+ String entityTypePathStr = clusterId + File.separator + userId +
+ File.separator + escape(flowName) + File.separator +
+ escape(flowVersion) + File.separator + flowRun + File.separator + appId
+ + File.separator + entity.getType();
Path entityTypePath = new Path(entitiesPath, entityTypePathStr);
try {
mkdirs(entityTypePath);
Path filePath =
new Path(entityTypePath,
- escape(entity.getId(), "id") +
TIMELINE_SERVICE_STORAGE_EXTENSION);
+ entity.getId() + TIMELINE_SERVICE_STORAGE_EXTENSION);
createFileWithRetries(filePath);
- byte[] record =
- (TimelineUtils.dumpTimelineRecordtoJSON(entity) + "\n")
- .getBytes(StandardCharsets.UTF_8);
+ byte[] record = new StringBuilder()
+ .append(TimelineUtils.dumpTimelineRecordtoJSON(entity))
+ .append("\n").toString().getBytes(StandardCharsets.UTF_8);
writeFileWithRetries(filePath, record);
} catch (Exception ioe) {
LOG.warn("Interrupted operation:{}", ioe.getMessage());
@@ -180,35 +153,6 @@ private synchronized void writeInternal(String clusterId,
String userId,
}
}
- /**
- * Given the various attributes of an entity, return the string subpath
- * of the directory.
- * @param clusterId cluster ID
- * @param userId user ID
- * @param flowName flow name
- * @param flowVersion flow version
- * @param flowRun flow run
- * @param appId application ID
- * @param type entity type
- * @return the subpath for records.
- */
- @VisibleForTesting
- public static String buildEntityTypeSubpath(final String clusterId,
- final String userId,
- final String flowName,
- final String flowVersion,
- final long flowRun,
- final String appId,
- final String type) {
- return clusterId
- + File.separator + userId
- + File.separator + escape(flowName, "")
- + File.separator + escape(flowVersion, "")
- + File.separator + flowRun
- + File.separator + escape(appId, "")
- + File.separator + escape(type, "type");
- }
-
private TimelineWriteError createTimelineWriteError(TimelineEntity entity) {
TimelineWriteError error = new TimelineWriteError();
error.setEntityId(entity.getId());
@@ -368,29 +312,8 @@ protected void writeFile(Path outputPath, byte[] data)
throws IOException {
}
}
- /**
- * Escape filesystem separator character and other URL-unfriendly chars.
- * @param str input string
- * @return a string with none of the escaped characters.
- */
- @VisibleForTesting
- public static String escape(String str) {
- return escape(str, "");
- }
-
- /**
- * Escape filesystem separator character and other URL-unfriendly chars.
- * Empty strings are mapped to a fallback string, which may itself be empty.
- * @param str input string
- * @param fallback fallback char
- * @return a string with none of the escaped characters.
- */
- @VisibleForTesting
- public static String escape(String str, final String fallback) {
- return str.isEmpty()
- ? fallback
- : str.replace(File.separatorChar, '_')
- .replace('?', '_')
- .replace(':', '_');
+ // specifically escape the separator character
+ private static String escape(String str) {
+ return str.replace(File.separatorChar, '_');
}
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
index d6d7ec3d922..ccc74910377 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
@@ -73,8 +73,6 @@ TimelineWriteResponse write(TimelineCollectorContext context,
*
* Any errors occurring for individual write request objects will be reported
* in the response.
- *<p>
- * This is not invoked anywhere, tested and all implementations return null.
*
* @param data
* a {@link TimelineEntity} object
@@ -82,7 +80,7 @@ TimelineWriteResponse write(TimelineCollectorContext context,
* value.
* @param track Specifies the track or dimension along which aggregation
would
* occur. Includes USER, FLOW, QUEUE, etc.
- * @return a {@link TimelineWriteResponse} object. All implementations
return null.
+ * @return a {@link TimelineWriteResponse} object.
* @throws IOException if there is any exception encountered while
aggregating
* entities to the backend storage.
*/
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
index 8297f17e4b1..efed104eeea 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
@@ -27,17 +27,14 @@
import java.util.List;
import java.util.Map;
-import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.test.AbstractHadoopTestBase;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
@@ -46,16 +43,11 @@
import
org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
-import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
-import static
org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl.buildEntityTypeSubpath;
-import static
org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl.escape;
-
-public class TestFileSystemTimelineWriterImpl extends AbstractHadoopTestBase {
- private static final Logger LOG =
- LoggerFactory.getLogger(TestFileSystemTimelineWriterImpl.class);
-
- public static final String UP = ".." + File.separator;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+public class TestFileSystemTimelineWriterImpl {
@TempDir
private File tmpFolder;
@@ -92,10 +84,12 @@ void testWriteEntityToFile() throws Exception {
te.addEntity(entity2);
Map<String, TimelineMetric> aggregatedMetrics =
- new HashMap<>();
+ new HashMap<String, TimelineMetric>();
aggregatedMetrics.put(metricId, metric);
- try (FileSystemTimelineWriterImpl fsi = new
FileSystemTimelineWriterImpl()) {
+ FileSystemTimelineWriterImpl fsi = null;
+ try {
+ fsi = new FileSystemTimelineWriterImpl();
Configuration conf = new YarnConfiguration();
String outputRoot = tmpFolder.getAbsolutePath();
conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
@@ -111,43 +105,47 @@ void testWriteEntityToFile() throws Exception {
File.separator + "cluster_id" + File.separator + "user_id" +
File.separator + "flow_name" + File.separator + "flow_version" +
File.separator + "12345678" + File.separator + "app_id" +
- File.separator + type
- + File.separator + id +
+ File.separator + type + File.separator + id +
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
- List<String> data = readFromFile(FileSystem.get(conf), new
Path(fileName), 2);
+ Path path = new Path(fileName);
+ FileSystem fs = FileSystem.get(conf);
+ assertTrue(fs.exists(path),
+ "Specified path(" + fileName + ") should exist: ");
+ FileStatus fileStatus = fs.getFileStatus(path);
+ assertFalse(fileStatus.isDirectory(), "Specified path should be a file");
+ List<String> data = readFromFile(fs, path);
// ensure there's only one entity + 1 new line
- Assertions.assertThat(data).hasSize(2);
+ assertEquals(2, data.size(), "data size is:" + data.size());
+ String d = data.get(0);
// confirm the contents same as what was written
- assertRecordMatches(data.get(0), entity);
+ assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity));
// verify aggregated metrics
String fileName2 = fsi.getOutputRoot() + File.separator + "entities" +
File.separator + "cluster_id" + File.separator + "user_id" +
File.separator + "flow_name" + File.separator + "flow_version" +
File.separator + "12345678" + File.separator + "app_id" +
- File.separator + type2
- + File.separator + id2 +
+ File.separator + type2 + File.separator + id2 +
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
Path path2 = new Path(fileName2);
- List<String> data2 = readFromFile(FileSystem.get(conf), path2, 2);
+ assertTrue(fs.exists(path2),
+ "Specified path(" + fileName + ") should exist: ");
+ FileStatus fileStatus2 = fs.getFileStatus(path2);
+ assertFalse(fileStatus2.isDirectory(), "Specified path should be a
file");
+ List<String> data2 = readFromFile(fs, path2);
// ensure there's only one entity + 1 new line
- Assertions.assertThat(data).hasSize(2);
+ assertEquals(2, data2.size(), "data size is:" + data2.size());
+ String metricToString = data2.get(0);
// confirm the contents same as what was written
- assertRecordMatches(data2.get(0), entity2);
+ assertEquals(metricToString,
+ TimelineUtils.dumpTimelineRecordtoJSON(entity2));
+ } finally {
+ if (fsi != null) {
+ fsi.close();
+ }
}
}
- /**
- * Assert a read in string matches the json value of the entity
- * @param d record
- * @param entity expected
- */
- private static void assertRecordMatches(final String d, final TimelineEntity
entity)
- throws IOException {
- Assertions.assertThat(d)
- .isEqualTo(TimelineUtils.dumpTimelineRecordtoJSON(entity));
- }
-
@Test
void testWriteMultipleEntities() throws Exception {
String id = "appId";
@@ -167,7 +165,9 @@ void testWriteMultipleEntities() throws Exception {
entity2.setCreatedTime(1425016503000L);
te2.addEntity(entity2);
- try (FileSystemTimelineWriterImpl fsi = new
FileSystemTimelineWriterImpl()) {
+ FileSystemTimelineWriterImpl fsi = null;
+ try {
+ fsi = new FileSystemTimelineWriterImpl();
Configuration conf = new YarnConfiguration();
String outputRoot = tmpFolder.getAbsolutePath();
conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
@@ -183,19 +183,33 @@ void testWriteMultipleEntities() throws Exception {
"flow_version", 12345678L, "app_id"),
te2, UserGroupInformation.createRemoteUser("user_id"));
- String fileName = outputRoot + File.separator + "entities"
- + File.separator + buildEntityTypeSubpath("cluster_id", "user_id",
- "flow_name" ,"flow_version" ,12345678, "app_id", type)
- + File.separator + id
- + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+ String fileName = outputRoot + File.separator + "entities" +
+ File.separator + "cluster_id" + File.separator + "user_id" +
+ File.separator + "flow_name" + File.separator + "flow_version" +
+ File.separator + "12345678" + File.separator + "app_id" +
+ File.separator + type + File.separator + id +
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
Path path = new Path(fileName);
FileSystem fs = FileSystem.get(conf);
- List<String> data = readFromFile(fs, path, 3);
+ assertTrue(fs.exists(path),
+ "Specified path(" + fileName + ") should exist: ");
+ FileStatus fileStatus = fs.getFileStatus(path);
+ assertFalse(fileStatus.isDirectory(), "Specified path should be a file");
+ List<String> data = readFromFile(fs, path);
+ assertEquals(3, data.size(), "data size is:" + data.size());
+ String d = data.get(0);
// confirm the contents same as what was written
- assertRecordMatches(data.get(0), entity);
+ assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity));
+
+ String metricToString = data.get(1);
// confirm the contents same as what was written
- assertRecordMatches(data.get(1), entity2);
+ assertEquals(metricToString,
+ TimelineUtils.dumpTimelineRecordtoJSON(entity2));
+ } finally {
+ if (fsi != null) {
+ fsi.close();
+ }
}
}
@@ -211,7 +225,9 @@ void testWriteEntitiesWithEmptyFlowName() throws Exception {
entity.setCreatedTime(1425016501000L);
te.addEntity(entity);
- try (FileSystemTimelineWriterImpl fsi = new
FileSystemTimelineWriterImpl()) {
+ FileSystemTimelineWriterImpl fsi = null;
+ try {
+ fsi = new FileSystemTimelineWriterImpl();
Configuration conf = new YarnConfiguration();
String outputRoot = tmpFolder.getAbsolutePath();
conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
@@ -223,86 +239,32 @@ void testWriteEntitiesWithEmptyFlowName() throws
Exception {
"flow_version", 12345678L, "app_id"),
te, UserGroupInformation.createRemoteUser("user_id"));
- String fileName = outputRoot + File.separator + "entities"
- + File.separator + buildEntityTypeSubpath("cluster_id", "user_id",
- "" ,"flow_version" ,12345678, "app_id", type)
- + File.separator + id
- + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
-
- List<String> data = readFromFile(FileSystem.get(conf), new
Path(fileName), 2);
- // confirm the contents same as what was written
- assertRecordMatches(data.get(0), entity);
- }
- }
-
- /**
- * Stress test the escaping logic.
- */
- @Test
- void testWriteEntitiesWithEscaping() throws Exception {
- String id = UP + "appid";
- String type = UP + "type";
-
- TimelineEntities te = new TimelineEntities();
- TimelineEntity entity = new TimelineEntity();
- entity.setId(id);
- entity.setType(type);
- entity.setCreatedTime(1425016501000L);
- te.addEntity(entity);
-
- try (FileSystemTimelineWriterImpl fsi = new
FileSystemTimelineWriterImpl()) {
- Configuration conf = new YarnConfiguration();
- String outputRoot = tmpFolder.getAbsolutePath();
- conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
- outputRoot);
- fsi.init(conf);
- fsi.start();
- final String flowName = UP + "flow_name?";
- final String flowVersion = UP + "flow_version/";
- fsi.write(
- new TimelineCollectorContext("cluster_id", "user_id", flowName,
- flowVersion, 12345678L, "app_id"),
- te, UserGroupInformation.createRemoteUser("user_id"));
-
- String fileName = outputRoot + File.separator + "entities"
- + File.separator + buildEntityTypeSubpath("cluster_id", "user_id",
- flowName, flowVersion,12345678, "app_id", type)
- + File.separator + escape(id, "id")
- + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
-
- List<String> data = readFromFile(FileSystem.get(conf), new
Path(fileName), 2);
+ String fileName = outputRoot + File.separator + "entities" +
+ File.separator + "cluster_id" + File.separator + "user_id" +
+ File.separator + "" + File.separator + "flow_version" +
+ File.separator + "12345678" + File.separator + "app_id" +
+ File.separator + type + File.separator + id +
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+ Path path = new Path(fileName);
+ FileSystem fs = FileSystem.get(conf);
+ assertTrue(fs.exists(path),
+ "Specified path(" + fileName + ") should exist: ");
+ FileStatus fileStatus = fs.getFileStatus(path);
+ assertFalse(fileStatus.isDirectory(), "specified path should be a file");
+ List<String> data = readFromFile(fs, path);
+ assertEquals(2, data.size(), "data size is:" + data.size());
+ String d = data.get(0);
// confirm the contents same as what was written
- assertRecordMatches(data.get(0), entity);
+ assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity));
+ } finally {
+ if (fsi != null) {
+ fsi.close();
+ }
}
}
- /**
- * Test escape downgrades file separators and inserts the fallback on a null
input.
- */
- @Test
- public void testEscapingAndFallback() throws Throwable {
- Assertions.assertThat(escape("", "fallback"))
- .isEqualTo("fallback");
- Assertions.assertThat(escape(File.separator, "fallback"))
- .isEqualTo("_");
- Assertions.assertThat(escape("?:", ""))
- .isEqualTo("__");
- }
-
- /**
- * Read a file line by line, logging its name first and verifying it is
actually a file.
- * Asserts the number of lines read is as expected.
- * @param fs fs
- * @param path path
- * @param entryCount number of entries expected.
- * @return a possibly empty list of lines
- * @throws IOException IO failure
- */
- private List<String> readFromFile(FileSystem fs, Path path, int entryCount)
+ private List<String> readFromFile(FileSystem fs, Path path)
throws IOException {
-
- LOG.info("Reading file from {}", path);
- assertIsFile(fs, path);
BufferedReader br = new BufferedReader(
new InputStreamReader(fs.open(path)));
List<String> data = new ArrayList<>();
@@ -312,7 +274,6 @@ private List<String> readFromFile(FileSystem fs, Path path,
int entryCount)
line = br.readLine();
data.add(line);
}
- Assertions.assertThat(data).hasSize(entryCount);
return data;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]