[ https://issues.apache.org/jira/browse/PARQUET-2276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17718736#comment-17718736 ]
ASF GitHub Bot commented on PARQUET-2276: ----------------------------------------- Fokko commented on code in PR #1084: URL: https://github.com/apache/parquet-mr/pull/1084#discussion_r1183080135 ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java: ########## @@ -46,41 +83,91 @@ public class HadoopStreams { */ public static SeekableInputStream wrap(FSDataInputStream stream) { Objects.requireNonNull(stream, "Cannot wrap a null input stream"); - if (isWrappedStreamByteBufferReadable(stream)) { - return new H2SeekableInputStream(stream); - } else { - return new H1SeekableInputStream(stream); + + // Try to check using hasCapabilities(str) + Boolean hasCapabilitiesResult = isWrappedStreamByteBufferReadableHasCapabilities(stream); + + // If it is null, then fall back to the old method + if (hasCapabilitiesResult != null) { + if (hasCapabilitiesResult) { + return new H2SeekableInputStream(stream); + } else { + return new H1SeekableInputStream(stream); + } + } + + return isWrappedStreamByteBufferReadableLegacy(stream); + } + + /** + * Is the inner stream byte buffer readable? + * The test is 'the stream is not FSDataInputStream + * and implements ByteBufferReadable' + * + * This logic is only used for Hadoop <2.9.x, and <3.x.x + * + * @param stream stream to probe + * @return A H2SeekableInputStream to access, or H1SeekableInputStream if the stream is not seekable + */ + private static SeekableInputStream isWrappedStreamByteBufferReadableLegacy(FSDataInputStream stream) { + InputStream wrapped = stream.getWrappedStream(); + if (wrapped instanceof FSDataInputStream) { + LOG.debug("Checking on wrapped stream {} of {} whether is ByteBufferReadable", wrapped, stream); + return isWrappedStreamByteBufferReadableLegacy(((FSDataInputStream) wrapped)); + } + if (byteBufferReadableClass != null && h2SeekableConstructor != null && + byteBufferReadableClass.isInstance(stream.getWrappedStream())) { + try { + return h2SeekableConstructor.newInstance(stream); + } catch (InstantiationException | IllegalAccessException e) { + LOG.warn("Could not instantiate H2SeekableInputStream, falling back to byte array reads", e); + } catch (InvocationTargetException e) { + throw new ParquetDecodingException( + "Could not instantiate H2SeekableInputStream", e.getTargetException()); + } } + return new H1SeekableInputStream(stream); } /** * Is the inner stream byte buffer readable? - * The test is "the stream is not FSDataInputStream + * The test is 'the stream is not FSDataInputStream * and implements ByteBufferReadable' * * That is: all streams which implement ByteBufferReadable - * other than FSDataInputStream successfuly support read(ByteBuffer). - * This is true for all filesytem clients the hadoop codebase. + * other than FSDataInputStream successfully support read(ByteBuffer). + * This is true for all filesystem clients the hadoop codebase. * * In hadoop 3.3.0+, the StreamCapabilities probe can be used to * check this: only those streams which provide the read(ByteBuffer) * semantics MAY return true for the probe "in:readbytebuffer"; * FSDataInputStream will pass the probe down to the underlying stream. * * @param stream stream to probe - * @return true if it is safe to a H2SeekableInputStream to access the data + * @return true if it is safe to a H2SeekableInputStream to access + * the data, null when it cannot be determined */ - private static boolean isWrappedStreamByteBufferReadable(FSDataInputStream stream) { - if (stream.hasCapability("in:readbytebuffer")) { - // stream is issuing the guarantee that it implements the - // API. Holds for all implementations in hadoop-* - // since Hadoop 3.3.0 (HDFS-14111). - return true; + private static Boolean isWrappedStreamByteBufferReadableHasCapabilities(FSDataInputStream stream) { + Method methodHasCapabilities; + try { + methodHasCapabilities = stream.getClass().getMethod("hasCapability", String.class); Review Comment: Added it > ParquetReader reads do not work with Hadoop version 2.8.5 > --------------------------------------------------------- > > Key: PARQUET-2276 > URL: https://issues.apache.org/jira/browse/PARQUET-2276 > Project: Parquet > Issue Type: Bug > Components: parquet-mr > Affects Versions: 1.13.0 > Reporter: Atul Mohan > Assignee: Fokko Driesprong > Priority: Major > Fix For: 1.14.0, 1.13.1 > > > {{ParquetReader.read() fails with the following exception on parquet-mr > version 1.13.0 when using hadoop version 2.8.5:}} > {code:java} > java.lang.NoSuchMethodError: 'boolean > org.apache.hadoop.fs.FSDataInputStream.hasCapability(java.lang.String)' > at > org.apache.parquet.hadoop.util.HadoopStreams.isWrappedStreamByteBufferReadable(HadoopStreams.java:74) > > at org.apache.parquet.hadoop.util.HadoopStreams.wrap(HadoopStreams.java:49) > at > org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:69) > > at > org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:787) > > at > org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657) > at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:162) > org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135) > {code} > > > > From an initial investigation, it looks like HadoopStreams has started using > [FSDataInputStream.hasCapability|https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java#L74] > but _FSDataInputStream_ does not have the _hasCapability_ API in [hadoop > 2.8.x|https://hadoop.apache.org/docs/r2.8.3/api/org/apache/hadoop/fs/FSDataInputStream.html]. -- This message was sent by Atlassian Jira (v8.20.10#820010)