[ https://issues.apache.org/jira/browse/BEAM-9399?focusedWorklogId=401548&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-401548 ]
ASF GitHub Bot logged work on BEAM-9399: ---------------------------------------- Author: ASF GitHub Bot Created on: 11/Mar/20 16:03 Start Date: 11/Mar/20 16:03 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11096: [BEAM-9399] Change the redirection of System.err to be a custom PrintStream URL: https://github.com/apache/beam/pull/11096#discussion_r391071339 ########## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java ########## @@ -37,114 +37,272 @@ class JulHandlerPrintStreamAdapterFactory { private static final AtomicBoolean outputWarning = new AtomicBoolean(false); - /** - * Creates a {@link PrintStream} which redirects all output to the JUL {@link Handler} with the - * specified {@code loggerName} and {@code level}. - */ - static PrintStream create(Handler handler, String loggerName, Level messageLevel) { - try { - return new PrintStream( - new JulHandlerAdapterOutputStream(handler, loggerName, messageLevel), - false, - StandardCharsets.UTF_8.name()); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException(e); - } - } - - /** - * An output stream adapter which is able to take a stream of UTF-8 data and output to a named JUL - * log handler. The log messages will be buffered until the system dependent new line separator is - * seen, at which point the buffered string will be output. - */ - private static class JulHandlerAdapterOutputStream extends OutputStream { + private static class JulHandlerPrintStream extends PrintStream { private static final String LOGGING_DISCLAIMER = String.format( "Please use a logger instead of System.out or System.err.%n" + "Please switch to using org.slf4j.Logger.%n" + "See: https://cloud.google.com/dataflow/pipelines/logging"); - // This limits the number of bytes which we buffer in case we don't see a newline character. - private static final int BUFFER_LIMIT = 1 << 14; // 16384 bytes - private static final byte[] NEW_LINE = System.lineSeparator().getBytes(StandardCharsets.UTF_8); + // This limits the number of bytes which we buffer in case we don't have a flush. + private static final int BUFFER_LIMIT = 1 << 10; // 1024 chars /** Hold reference of named logger to check configured {@link Level}. */ private Logger logger; private Handler handler; private String loggerName; - private ByteArrayOutputStream baos; + private StringBuilder buffer; private Level messageLevel; - private int matched = 0; - private JulHandlerAdapterOutputStream(Handler handler, String loggerName, Level logLevel) { + private JulHandlerPrintStream(Handler handler, String loggerName, Level logLevel) { + super( + new OutputStream() { + @Override + public void write(int i) throws IOException { + throw new RuntimeException("All methods should be overwritten so this is unused"); + } + }); this.handler = handler; this.loggerName = loggerName; this.messageLevel = logLevel; this.logger = Logger.getLogger(loggerName); - this.baos = new ByteArrayOutputStream(BUFFER_LIMIT); + this.buffer = new StringBuilder(BUFFER_LIMIT); } @Override - public void write(int b) { - if (outputWarning.compareAndSet(false, true)) { - publish(Level.WARNING, LOGGING_DISCLAIMER); + public void flush() { + publish(flushToString()); + } + + private synchronized String flushToString() { + if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\n') { + buffer.setLength(buffer.length() - 1); } + String result = buffer.toString(); + buffer.setLength(0); + return result; + } - baos.write(b); - // Check to see if the next byte matches further into new line string. - if (NEW_LINE[matched] == b) { - matched += 1; - // If we have matched the entire new line, output the contents of the buffer. - if (matched == NEW_LINE.length) { - output(); - } - } else { - // Reset the match - matched = 0; + @Override + public void close() { + flush(); + } + + @Override + public boolean checkError() { + return false; + } + + @Override + public synchronized void write(int i) { + buffer.append(i); + } + + @Override + public void write(byte[] a, int start, int limit) { + // XXX this enforces decoding on boundaries where before it didn't, does that matter? Review comment: It would be an issue for multi-byte wide characters that are split. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 401548) Time Spent: 50m (was: 40m) > Possible deadlock between DataflowWorkerLoggingHandler and overridden > System.err PrintStream > -------------------------------------------------------------------------------------------- > > Key: BEAM-9399 > URL: https://issues.apache.org/jira/browse/BEAM-9399 > Project: Beam > Issue Type: Bug > Components: runner-dataflow > Reporter: Sam Whittle > Assignee: Sam Whittle > Priority: Minor > Time Spent: 50m > Remaining Estimate: 0h > > When an exception is encountered in DataflowWorkerLoggingHandler the > ErrorManager is used to log the exception. ErrorManager uses System.err > which is overridden to be a PrintStream that writes back into > DataflowWorkerLoggingHandler. > This has the lock ordering DataflowWorkerLoggingHandler -> PrintStream. > Other logging of System.err has the inverse lock ordering > PrintStream->DataflowWorkerLoggingHandler so there is potential for deadlock. > This is one known cause of the inversion, but any other System.err logs from > inside DataflowWorkerLoggingHandler could cause the same issue. > Proposed fix is to address low-hanging fruit of having ErrorManager output to > the original System.err. A full fix would be to improve our override of > System.err to a PrintStream that can detect the locking inversion or possibly > we could use the PrintStream mutex in both cases. -- This message was sent by Atlassian Jira (v8.3.4#803005)