[ https://issues.apache.org/jira/browse/FLINK-1579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897495#comment-15897495 ]
ASF GitHub Bot commented on FLINK-1579: --------------------------------------- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104431499 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java --- @@ -205,215 +125,18 @@ public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Except } } - /** - * Response when running with leading JobManager. - */ - private void respondAsLeader(ChannelHandlerContext ctx, HttpRequest request, String requestPath) - throws IOException, ParseException, URISyntaxException { - - // convert to absolute path - final File file = new File(rootPath, requestPath); - - if (!file.exists()) { - // file does not exist. Try to load it with the classloader - ClassLoader cl = StaticFileServerHandler.class.getClassLoader(); - - try(InputStream resourceStream = cl.getResourceAsStream("web" + requestPath)) { - boolean success = false; - try { - if (resourceStream != null) { - URL root = cl.getResource("web"); - URL requested = cl.getResource("web" + requestPath); - - if (root != null && requested != null) { - URI rootURI = new URI(root.getPath()).normalize(); - URI requestedURI = new URI(requested.getPath()).normalize(); - - // Check that we don't load anything from outside of the - // expected scope. - if (!rootURI.relativize(requestedURI).equals(requestedURI)) { - logger.debug("Loading missing file from classloader: {}", requestPath); - // ensure that directory to file exists. - file.getParentFile().mkdirs(); - Files.copy(resourceStream, file.toPath()); - - success = true; - } - } - } - } catch (Throwable t) { - logger.error("error while responding", t); - } finally { - if (!success) { - logger.debug("Unable to load requested file {} from classloader", requestPath); - sendError(ctx, NOT_FOUND); - return; - } - } - } - } - - if (!file.exists() || file.isHidden() || file.isDirectory() || !file.isFile()) { - sendError(ctx, NOT_FOUND); - return; - } - - if (!file.getCanonicalFile().toPath().startsWith(rootPath.toPath())) { - sendError(ctx, NOT_FOUND); - return; - } - - // cache validation - final String ifModifiedSince = request.headers().get(IF_MODIFIED_SINCE); - if (ifModifiedSince != null && !ifModifiedSince.isEmpty()) { - SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US); - Date ifModifiedSinceDate = dateFormatter.parse(ifModifiedSince); - - // Only compare up to the second because the datetime format we send to the client - // does not have milliseconds - long ifModifiedSinceDateSeconds = ifModifiedSinceDate.getTime() / 1000; - long fileLastModifiedSeconds = file.lastModified() / 1000; - if (ifModifiedSinceDateSeconds == fileLastModifiedSeconds) { - if (logger.isDebugEnabled()) { - logger.debug("Responding 'NOT MODIFIED' for file '" + file.getAbsolutePath() + '\''); - } - - sendNotModified(ctx); - return; - } - } - - if (logger.isDebugEnabled()) { - logger.debug("Responding with file '" + file.getAbsolutePath() + '\''); - } - - // Don't need to close this manually. Netty's DefaultFileRegion will take care of it. - final RandomAccessFile raf; - try { - raf = new RandomAccessFile(file, "r"); - } - catch (FileNotFoundException e) { - sendError(ctx, NOT_FOUND); - return; - } - long fileLength = raf.length(); - - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); - setContentTypeHeader(response, file); - - // since the log and out files are rapidly changing, we don't want to browser to cache them - if (!(requestPath.contains("log") || requestPath.contains("out"))) { - setDateAndCacheHeaders(response, file); - } - if (HttpHeaders.isKeepAlive(request)) { - response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE); - } - HttpHeaders.setContentLength(response, fileLength); - - // write the initial line and the header. - ctx.write(response); - - // write the content. - ChannelFuture lastContentFuture; - if (ctx.pipeline().get(SslHandler.class) == null) { - ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise()); - lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); + @Override + public String preProcessRequestPath(String requestPath) { + // in case the files being accessed are logs or stdout files, find appropriate paths. + if (requestPath.equals("/jobmanager/log") || requestPath.equals("/jobmanager/stdout")) { + return ""; } else { - lastContentFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)), - ctx.newProgressivePromise()); - // HttpChunkedInput will write the end marker (LastHttpContent) for us. - } - - // close the connection, if no keep-alive is needed - if (!HttpHeaders.isKeepAlive(request)) { - lastContentFuture.addListener(ChannelFutureListener.CLOSE); + return requestPath; } } @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - if (ctx.channel().isActive()) { - logger.error("Caught exception", cause); - sendError(ctx, INTERNAL_SERVER_ERROR); - } - } - - // ------------------------------------------------------------------------ - // Utilities to encode headers and responses - // ------------------------------------------------------------------------ - - /** - * Writes a simple error response message. - * - * @param ctx The channel context to write the response to. - * @param status The response status. - */ - private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) { - FullHttpResponse response = new DefaultFullHttpResponse( - HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8)); - response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8"); - - // close the connection as soon as the error message is sent. - ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); - } - - /** - * Send the "304 Not Modified" response. This response can be used when the - * file timestamp is the same as what the browser is sending up. - * - * @param ctx The channel context to write the response to. - */ - private static void sendNotModified(ChannelHandlerContext ctx) { - FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, NOT_MODIFIED); - setDateHeader(response); - - // close the connection as soon as the error message is sent. - ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); - } - - /** - * Sets the "date" header for the HTTP response. - * - * @param response HTTP response - */ - private static void setDateHeader(FullHttpResponse response) { - SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US); - dateFormatter.setTimeZone(GMT_TIMEZONE); - - Calendar time = new GregorianCalendar(); - response.headers().set(DATE, dateFormatter.format(time.getTime())); - } - - /** - * Sets the "date" and "cache" headers for the HTTP Response. - * - * @param response The HTTP response object. - * @param fileToCache File to extract the modification timestamp from. - */ - private static void setDateAndCacheHeaders(HttpResponse response, File fileToCache) { - SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US); - dateFormatter.setTimeZone(GMT_TIMEZONE); - - // date header - Calendar time = new GregorianCalendar(); - response.headers().set(DATE, dateFormatter.format(time.getTime())); - - // cache headers - time.add(Calendar.SECOND, HTTP_CACHE_SECONDS); - response.headers().set(EXPIRES, dateFormatter.format(time.getTime())); - response.headers().set(CACHE_CONTROL, "private, max-age=" + HTTP_CACHE_SECONDS); - response.headers().set(LAST_MODIFIED, dateFormatter.format(new Date(fileToCache.lastModified()))); - } - - /** - * Sets the content type header for the HTTP Response. - * - * @param response HTTP response - * @param file file to extract content type - */ - private static void setContentTypeHeader(HttpResponse response, File file) { - String mimeType = MimeTypes.getMimeTypeForFileName(file.getName()); - String mimeFinal = mimeType != null ? mimeType : MimeTypes.getDefaultMimeType(); - response.headers().set(CONTENT_TYPE, mimeFinal); + protected boolean shouldCache(String requestPath) { + return !(requestPath.contains("log") || requestPath.contains("out")); --- End diff -- Should we make this more explicit in order to prevent accidental non-caching of requests that contain out or log for another reason? > Create a Flink History Server > ----------------------------- > > Key: FLINK-1579 > URL: https://issues.apache.org/jira/browse/FLINK-1579 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination > Affects Versions: 0.9 > Reporter: Robert Metzger > Assignee: Chesnay Schepler > > Right now its not possible to analyze the job results for jobs that ran on > YARN, because we'll loose the information once the JobManager has stopped. > Therefore, I propose to implement a "Flink History Server" which serves the > results from these jobs. > I haven't started thinking about the implementation, but I suspect it > involves some JSON files stored in HDFS :) -- This message was sent by Atlassian JIRA (v6.3.15#6346)