This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 49cedf95c5b Revert "YARN-11916. FileSystemTimelineReaderImpl 
vulnerable to race conditions (#8164)"
49cedf95c5b is described below

commit 49cedf95c5baab457d1cf058b9320f024c597b2f
Author: Steve Loughran <[email protected]>
AuthorDate: Wed Jan 7 15:24:51 2026 +0000

    Revert "YARN-11916. FileSystemTimelineReaderImpl vulnerable to race 
conditions (#8164)"
    
    This reverts commit 11e58a4f01534ff356e2a0d524747e2db2a19305.
---
 .../hadoop-yarn/hadoop-yarn-client/pom.xml         |   6 -
 .../storage/FileSystemTimelineReaderImpl.java      |  10 +-
 .../storage/FileSystemTimelineWriterImpl.java      | 101 ++--------
 .../timelineservice/storage/TimelineWriter.java    |   4 +-
 .../storage/TestFileSystemTimelineWriterImpl.java  | 205 +++++++++------------
 5 files changed, 97 insertions(+), 229 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
index fef007f9b35..606dd18b06a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
@@ -163,12 +163,6 @@
       <scope>test</scope>
       <type>test-jar</type>
        </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-server-timelineservice</artifactId>
-      <scope>test</scope>
-      <type>test-jar</type>
-    </dependency>
     <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
     <dependency>
     <groupId>org.apache.hadoop</groupId>
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
similarity index 97%
rename from 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
rename to 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
index d9bee2db93f..2e771fc77e8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
@@ -39,8 +39,6 @@
 import org.apache.commons.csv.CSVFormat;
 import org.apache.commons.csv.CSVParser;
 import org.apache.commons.csv.CSVRecord;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -63,16 +61,12 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static 
org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl.escape;
-
 /**
  *  File System based implementation for TimelineReader. This implementation 
may
  *  not provide a complete implementation of all the necessary features. This
  *  implementation is provided solely for basic testing purposes, and should 
not
  *  be used in a non-test situation.
  */
[email protected]
[email protected]
 public class FileSystemTimelineReaderImpl extends AbstractService
     implements TimelineReader {
 
@@ -170,9 +164,7 @@ private static void fillFields(TimelineEntity finalEntity,
   private String getFlowRunPath(String userId, String clusterId,
       String flowName, Long flowRunId, String appId) throws IOException {
     if (userId != null && flowName != null && flowRunId != null) {
-      return escape(userId) + File.separator
-          + escape(flowName) + File.separator
-          + "*" + File.separator + flowRunId;
+      return userId + File.separator + flowName + File.separator + "*" + 
File.separator + flowRunId;
     }
     if (clusterId == null || appId == null) {
       throw new IOException("Unable to get flow info");
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
similarity index 75%
rename from 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
rename to 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
index 3ecdea4e2f0..2f7007165a0 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
@@ -29,7 +29,6 @@
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.store.LogExactlyOnce;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
@@ -52,27 +51,7 @@
  * This implements a FileSystem based backend for storing application timeline
  * information. This implementation may not provide a complete implementation 
of
  * all the necessary features. This implementation is provided solely for basic
- * testing purposes, and MUST NOT be used in a non-test situation.
- * <p>
- *   Key limitations are:
- *   <ol>
- *     <li>Inadequate scalability and concurrency for production use</li>
- *     <li>Weak security: any authenticated caller can add events to any 
application
- *         timeline.</li>
- *   </ol>
- * <p>
- * To implement an atomic append it reads all the data in the original file,
- * writes that to a temporary file, appends the new
- * data there and renames that temporary file to the original path.
- * This makes the update operation slower and slower the longer an application 
runs.
- * If any other update comes in while an existing update is in progress,
- * it will read and append to the previous state of the log, losing all changes
- * from the ongoing transaction.
- * <p>
- * This is not a database. Apache HBase is. Use it.
- * <p>
- * The only realistic justification for this is if you are writing code which 
updates
- * the timeline service and you want something easier to debug in unit tests.
+ * testing purposes, and should not be used in a non-test situation.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
@@ -110,16 +89,8 @@ public class FileSystemTimelineWriterImpl extends 
AbstractService
   private static final Logger LOG =
           LoggerFactory.getLogger(FileSystemTimelineWriter.class);
 
-  private static final Logger LOGIMPL =
-      LoggerFactory.getLogger(FileSystemTimelineWriterImpl.class);
-
-  public static final LogExactlyOnce WARNING_OF_USE =
-      new LogExactlyOnce(LOGIMPL);
-
   FileSystemTimelineWriterImpl() {
     super((FileSystemTimelineWriterImpl.class.getName()));
-    WARNING_OF_USE.warn("This timeline writer is neither safe nor scaleable 
enough to"
-        + " be used in production.");
   }
 
   @Override
@@ -155,19 +126,21 @@ private synchronized void writeInternal(String clusterId, 
String userId,
                                           TimelineEntity entity,
                                           TimelineWriteResponse response)
                                           throws IOException {
-    final String entityTypePathStr =
-        buildEntityTypeSubpath(clusterId, userId, flowName, flowVersion, 
flowRun, appId, entity.getType());
+    String entityTypePathStr = clusterId + File.separator + userId +
+        File.separator + escape(flowName) + File.separator +
+        escape(flowVersion) + File.separator + flowRun + File.separator + appId
+        + File.separator + entity.getType();
     Path entityTypePath = new Path(entitiesPath, entityTypePathStr);
     try {
       mkdirs(entityTypePath);
       Path filePath =
               new Path(entityTypePath,
-                      escape(entity.getId(), "id") + 
TIMELINE_SERVICE_STORAGE_EXTENSION);
+                      entity.getId() + TIMELINE_SERVICE_STORAGE_EXTENSION);
       createFileWithRetries(filePath);
 
-      byte[] record =
-          (TimelineUtils.dumpTimelineRecordtoJSON(entity) + "\n")
-              .getBytes(StandardCharsets.UTF_8);
+      byte[] record =  new StringBuilder()
+              .append(TimelineUtils.dumpTimelineRecordtoJSON(entity))
+              .append("\n").toString().getBytes(StandardCharsets.UTF_8);
       writeFileWithRetries(filePath, record);
     } catch (Exception ioe) {
       LOG.warn("Interrupted operation:{}", ioe.getMessage());
@@ -180,35 +153,6 @@ private synchronized void writeInternal(String clusterId, 
String userId,
     }
   }
 
-  /**
-   * Given the various attributes of an entity, return the string subpath
-   * of the directory.
-   * @param clusterId cluster ID
-   * @param userId user ID
-   * @param flowName flow name
-   * @param flowVersion flow version
-   * @param flowRun flow run
-   * @param appId application ID
-   * @param type entity type
-   * @return the subpath for records.
-   */
-  @VisibleForTesting
-  public static String buildEntityTypeSubpath(final String clusterId,
-      final String userId,
-      final String flowName,
-      final String flowVersion,
-      final long flowRun,
-      final String appId,
-      final String type) {
-    return clusterId
-        + File.separator + userId
-        + File.separator + escape(flowName, "")
-        + File.separator + escape(flowVersion, "")
-        + File.separator + flowRun
-        + File.separator + escape(appId, "")
-        + File.separator + escape(type, "type");
-  }
-
   private TimelineWriteError createTimelineWriteError(TimelineEntity entity) {
     TimelineWriteError error = new TimelineWriteError();
     error.setEntityId(entity.getId());
@@ -368,29 +312,8 @@ protected void writeFile(Path outputPath, byte[] data) 
throws IOException {
     }
   }
 
-  /**
-   * Escape filesystem separator character and other URL-unfriendly chars.
-   * @param str input string
-   * @return a string with none of the escaped characters.
-   */
-  @VisibleForTesting
-  public static String escape(String str) {
-    return escape(str, "");
-  }
-
-  /**
-   * Escape filesystem separator character and other URL-unfriendly chars.
-   * Empty strings are mapped to a fallback string, which may itself be empty.
-   * @param str input string
-   * @param fallback fallback char
-   * @return a string with none of the escaped characters.
-   */
-  @VisibleForTesting
-  public static String escape(String str, final String fallback) {
-    return str.isEmpty()
-        ? fallback
-        : str.replace(File.separatorChar, '_')
-            .replace('?', '_')
-            .replace(':', '_');
+  // specifically escape the separator character
+  private static String escape(String str) {
+    return str.replace(File.separatorChar, '_');
   }
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
index d6d7ec3d922..ccc74910377 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
@@ -73,8 +73,6 @@ TimelineWriteResponse write(TimelineCollectorContext context,
    *
    * Any errors occurring for individual write request objects will be reported
    * in the response.
-   *<p>
-   * This is not invoked anywhere, tested and all implementations return null.
    *
    * @param data
    *          a {@link TimelineEntity} object
@@ -82,7 +80,7 @@ TimelineWriteResponse write(TimelineCollectorContext context,
    *          value.
    * @param track Specifies the track or dimension along which aggregation 
would
    *     occur. Includes USER, FLOW, QUEUE, etc.
-   * @return a {@link TimelineWriteResponse} object. All implementations 
return null.
+   * @return a {@link TimelineWriteResponse} object.
    * @throws IOException if there is any exception encountered while 
aggregating
    *     entities to the backend storage.
    */
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
index 8297f17e4b1..efed104eeea 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
@@ -27,17 +27,14 @@
 import java.util.List;
 import java.util.Map;
 
-import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.test.AbstractHadoopTestBase;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
@@ -46,16 +43,11 @@
 import 
org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 
-import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
-import static 
org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl.buildEntityTypeSubpath;
-import static 
org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl.escape;
-
-public class TestFileSystemTimelineWriterImpl extends AbstractHadoopTestBase {
-  private static final Logger LOG =
-          LoggerFactory.getLogger(TestFileSystemTimelineWriterImpl.class);
-
-  public static final String UP = ".." + File.separator;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
+public class TestFileSystemTimelineWriterImpl {
   @TempDir
   private File tmpFolder;
 
@@ -92,10 +84,12 @@ void testWriteEntityToFile() throws Exception {
     te.addEntity(entity2);
 
     Map<String, TimelineMetric> aggregatedMetrics =
-        new HashMap<>();
+        new HashMap<String, TimelineMetric>();
     aggregatedMetrics.put(metricId, metric);
 
-    try (FileSystemTimelineWriterImpl fsi = new 
FileSystemTimelineWriterImpl()) {
+    FileSystemTimelineWriterImpl fsi = null;
+    try {
+      fsi = new FileSystemTimelineWriterImpl();
       Configuration conf = new YarnConfiguration();
       String outputRoot = tmpFolder.getAbsolutePath();
       conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
@@ -111,43 +105,47 @@ void testWriteEntityToFile() throws Exception {
           File.separator + "cluster_id" + File.separator + "user_id" +
           File.separator + "flow_name" + File.separator + "flow_version" +
           File.separator + "12345678" + File.separator + "app_id" +
-          File.separator + type
-          + File.separator + id +
+          File.separator + type + File.separator + id +
           FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
-      List<String> data = readFromFile(FileSystem.get(conf), new 
Path(fileName), 2);
+      Path path = new Path(fileName);
+      FileSystem fs = FileSystem.get(conf);
+      assertTrue(fs.exists(path),
+          "Specified path(" + fileName + ") should exist: ");
+      FileStatus fileStatus = fs.getFileStatus(path);
+      assertFalse(fileStatus.isDirectory(), "Specified path should be a file");
+      List<String> data = readFromFile(fs, path);
       // ensure there's only one entity + 1 new line
-      Assertions.assertThat(data).hasSize(2);
+      assertEquals(2, data.size(), "data size is:" + data.size());
+      String d = data.get(0);
       // confirm the contents same as what was written
-      assertRecordMatches(data.get(0), entity);
+      assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity));
 
       // verify aggregated metrics
       String fileName2 = fsi.getOutputRoot() + File.separator + "entities" +
           File.separator + "cluster_id" + File.separator + "user_id" +
           File.separator + "flow_name" + File.separator + "flow_version" +
           File.separator + "12345678" + File.separator + "app_id" +
-          File.separator + type2
-          + File.separator + id2 +
+          File.separator + type2 + File.separator + id2 +
           FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
       Path path2 = new Path(fileName2);
-      List<String> data2 = readFromFile(FileSystem.get(conf), path2, 2);
+      assertTrue(fs.exists(path2),
+          "Specified path(" + fileName + ") should exist: ");
+      FileStatus fileStatus2 = fs.getFileStatus(path2);
+      assertFalse(fileStatus2.isDirectory(), "Specified path should be a 
file");
+      List<String> data2 = readFromFile(fs, path2);
       // ensure there's only one entity + 1 new line
-      Assertions.assertThat(data).hasSize(2);
+      assertEquals(2, data2.size(), "data size is:" + data2.size());
+      String metricToString = data2.get(0);
       // confirm the contents same as what was written
-      assertRecordMatches(data2.get(0), entity2);
+      assertEquals(metricToString,
+          TimelineUtils.dumpTimelineRecordtoJSON(entity2));
+    } finally {
+      if (fsi != null) {
+        fsi.close();
+      }
     }
   }
 
-  /**
-   * Assert a read in string matches the json value of the entity
-   * @param d record
-   * @param entity expected
-   */
-  private static void assertRecordMatches(final String d, final TimelineEntity 
entity)
-      throws IOException {
-    Assertions.assertThat(d)
-        .isEqualTo(TimelineUtils.dumpTimelineRecordtoJSON(entity));
-  }
-
   @Test
   void testWriteMultipleEntities() throws Exception {
     String id = "appId";
@@ -167,7 +165,9 @@ void testWriteMultipleEntities() throws Exception {
     entity2.setCreatedTime(1425016503000L);
     te2.addEntity(entity2);
 
-    try (FileSystemTimelineWriterImpl fsi = new 
FileSystemTimelineWriterImpl()) {
+    FileSystemTimelineWriterImpl fsi = null;
+    try {
+      fsi = new FileSystemTimelineWriterImpl();
       Configuration conf = new YarnConfiguration();
       String outputRoot = tmpFolder.getAbsolutePath();
       conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
@@ -183,19 +183,33 @@ void testWriteMultipleEntities() throws Exception {
               "flow_version", 12345678L, "app_id"),
           te2, UserGroupInformation.createRemoteUser("user_id"));
 
-      String fileName = outputRoot + File.separator + "entities"
-          + File.separator + buildEntityTypeSubpath("cluster_id", "user_id",
-          "flow_name" ,"flow_version" ,12345678, "app_id", type)
-          + File.separator + id
-          + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+      String fileName = outputRoot + File.separator + "entities" +
+          File.separator + "cluster_id" + File.separator + "user_id" +
+          File.separator + "flow_name" + File.separator + "flow_version" +
+          File.separator + "12345678" + File.separator + "app_id" +
+          File.separator + type + File.separator + id +
+          FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
       Path path = new Path(fileName);
       FileSystem fs = FileSystem.get(conf);
-      List<String> data = readFromFile(fs, path, 3);
+      assertTrue(fs.exists(path),
+          "Specified path(" + fileName + ") should exist: ");
+      FileStatus fileStatus = fs.getFileStatus(path);
+      assertFalse(fileStatus.isDirectory(), "Specified path should be a file");
+      List<String> data = readFromFile(fs, path);
+      assertEquals(3, data.size(), "data size is:" + data.size());
+      String d = data.get(0);
       // confirm the contents same as what was written
-      assertRecordMatches(data.get(0), entity);
+      assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity));
+
 
+      String metricToString = data.get(1);
       // confirm the contents same as what was written
-      assertRecordMatches(data.get(1), entity2);
+      assertEquals(metricToString,
+          TimelineUtils.dumpTimelineRecordtoJSON(entity2));
+    } finally {
+      if (fsi != null) {
+        fsi.close();
+      }
     }
   }
 
@@ -211,7 +225,9 @@ void testWriteEntitiesWithEmptyFlowName() throws Exception {
     entity.setCreatedTime(1425016501000L);
     te.addEntity(entity);
 
-    try (FileSystemTimelineWriterImpl fsi = new 
FileSystemTimelineWriterImpl()) {
+    FileSystemTimelineWriterImpl fsi = null;
+    try {
+      fsi = new FileSystemTimelineWriterImpl();
       Configuration conf = new YarnConfiguration();
       String outputRoot = tmpFolder.getAbsolutePath();
       conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
@@ -223,86 +239,32 @@ void testWriteEntitiesWithEmptyFlowName() throws 
Exception {
               "flow_version", 12345678L, "app_id"),
           te, UserGroupInformation.createRemoteUser("user_id"));
 
-      String fileName = outputRoot + File.separator + "entities"
-          + File.separator + buildEntityTypeSubpath("cluster_id", "user_id",
-          "" ,"flow_version" ,12345678, "app_id", type)
-          + File.separator + id
-          + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
-
-      List<String> data = readFromFile(FileSystem.get(conf), new 
Path(fileName), 2);
-      // confirm the contents same as what was written
-      assertRecordMatches(data.get(0), entity);
-    }
-  }
-
-  /**
-   * Stress test the escaping logic.
-   */
-  @Test
-  void testWriteEntitiesWithEscaping() throws Exception {
-    String id = UP + "appid";
-    String type = UP + "type";
-
-    TimelineEntities te = new TimelineEntities();
-    TimelineEntity entity = new TimelineEntity();
-    entity.setId(id);
-    entity.setType(type);
-    entity.setCreatedTime(1425016501000L);
-    te.addEntity(entity);
-
-    try (FileSystemTimelineWriterImpl fsi = new 
FileSystemTimelineWriterImpl()) {
-      Configuration conf = new YarnConfiguration();
-      String outputRoot = tmpFolder.getAbsolutePath();
-      conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
-          outputRoot);
-      fsi.init(conf);
-      fsi.start();
-      final String flowName = UP + "flow_name?";
-      final String flowVersion = UP + "flow_version/";
-      fsi.write(
-          new TimelineCollectorContext("cluster_id", "user_id", flowName,
-              flowVersion, 12345678L, "app_id"),
-          te, UserGroupInformation.createRemoteUser("user_id"));
-
-      String fileName = outputRoot + File.separator + "entities"
-          + File.separator + buildEntityTypeSubpath("cluster_id", "user_id",
-          flowName, flowVersion,12345678, "app_id", type)
-          + File.separator + escape(id, "id")
-          + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
-
-      List<String> data = readFromFile(FileSystem.get(conf), new 
Path(fileName), 2);
+      String fileName = outputRoot + File.separator + "entities" +
+          File.separator + "cluster_id" + File.separator + "user_id" +
+          File.separator + "" + File.separator + "flow_version" +
+          File.separator + "12345678" + File.separator + "app_id" +
+          File.separator + type + File.separator + id +
+          FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+      Path path = new Path(fileName);
+      FileSystem fs = FileSystem.get(conf);
+      assertTrue(fs.exists(path),
+          "Specified path(" + fileName + ") should exist: ");
+      FileStatus fileStatus = fs.getFileStatus(path);
+      assertFalse(fileStatus.isDirectory(), "specified path should be a file");
+      List<String> data = readFromFile(fs, path);
+      assertEquals(2, data.size(), "data size is:" + data.size());
+      String d = data.get(0);
       // confirm the contents same as what was written
-      assertRecordMatches(data.get(0), entity);
+      assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity));
+    } finally {
+      if (fsi != null) {
+        fsi.close();
+      }
     }
   }
 
-  /**
-   * Test escape downgrades file separators and inserts the fallback on a null 
input.
-   */
-  @Test
-  public void testEscapingAndFallback() throws Throwable {
-    Assertions.assertThat(escape("", "fallback"))
-        .isEqualTo("fallback");
-    Assertions.assertThat(escape(File.separator, "fallback"))
-        .isEqualTo("_");
-    Assertions.assertThat(escape("?:", ""))
-        .isEqualTo("__");
-  }
-
-  /**
-   * Read a file line by line, logging its name first and verifying it is 
actually a file.
-   * Asserts the number of lines read is as expected.
-   * @param fs fs
-   * @param path path
-   * @param entryCount number of entries expected.
-   * @return a possibly empty list of lines
-   * @throws IOException IO failure
-   */
-  private List<String> readFromFile(FileSystem fs, Path path, int entryCount)
+  private List<String> readFromFile(FileSystem fs, Path path)
           throws IOException {
-
-    LOG.info("Reading file from {}", path);
-    assertIsFile(fs, path);
     BufferedReader br = new BufferedReader(
             new InputStreamReader(fs.open(path)));
     List<String> data = new ArrayList<>();
@@ -312,7 +274,6 @@ private List<String> readFromFile(FileSystem fs, Path path, 
int entryCount)
       line = br.readLine();
       data.add(line);
     }
-    Assertions.assertThat(data).hasSize(entryCount);
     return data;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to