[jira] [Commented] (PARQUET-2193) Encrypting only one field in nested field prevents reading of other fields in nested field without keys

2023-05-02 Thread Gidon Gershinsky (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17718795#comment-17718795
 ] 

Gidon Gershinsky commented on PARQUET-2193:
---

Yep, sorry about the delay. This turned out to be more challenging than I 
hoped; a fix at the encryption code level will require some changes in the 
format specification.. A rather big deal, and likely unjustified in this case. 
The immediate trigger is the `checkDeltaByteArrayProblem` verification, added 8 
years ago to detect encoding irregularities in older files.  For some reason 
this check is done only on files with nested columns, and not on files with 
regular columns (at least in Spark). Maybe the right thing today is to remove 
that verification. I'll check with the community.

> Encrypting only one field in nested field prevents reading of other fields in 
> nested field without keys
> ---
>
> Key: PARQUET-2193
> URL: https://issues.apache.org/jira/browse/PARQUET-2193
> Project: Parquet
>  Issue Type: New Feature
>  Components: parquet-mr
>Affects Versions: 1.12.0
>Reporter: Vignesh Nageswaran
>Priority: Major
>
> Hi Team,
> While exploring parquet encryption, it is found that, if a field in nested 
> column is encrypted , and If I want to read this parquet directory from other 
> applications which does not have encryption keys to decrypt it, I cannot read 
> the remaining fields of the nested column without keys. 
> Example 
> `
> {code:java}
> case class nestedItem(ic: Int = 0, sic : Double, pc: Int = 0)
> case class SquareItem(int_column: Int, square_int_column : Double, 
> partitionCol: Int, nestedCol :nestedItem)
> `{code}
> In the case class `SquareItem` , `nestedCol` field is nested field and I want 
> to encrypt a field `ic` within it. 
>  
> I also want the footer to be non encrypted , so that I can use the encrypted 
> parquet file by legacy applications. 
>  
> Encryption is successful, however, when I query the parquet file using spark 
> 3.3.0 without having any configuration for parquet encryption set up , I 
> cannot non encrypted fields of `nestedCol` `sic`. I was expecting that only 
> `nestedCol` `ic` field will not be querable.
>  
>  
> Reproducer. 
> Spark 3.3.0 Using Spark-shell 
> Downloaded the file 
> [parquet-hadoop-1.12.0-tests.jar|https://repo1.maven.org/maven2/org/apache/parquet/parquet-hadoop/1.12.0/parquet-hadoop-1.12.0-tests.jar]
>  and added it to spark-jars folder
> Code to create encrypted data. #  
>  
> {code:java}
> sc.hadoopConfiguration.set("parquet.crypto.factory.class" 
> ,"org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory")
> sc.hadoopConfiguration.set("parquet.encryption.kms.client.class" 
> ,"org.apache.parquet.crypto.keytools.mocks.InMemoryKMS")
> sc.hadoopConfiguration.set("parquet.encryption.key.list","key1a: 
> BAECAwQFBgcICQoLDA0ODw==, key2a: BAECAAECAAECAAECAAECAA==, keyz: 
> BAECAAECAAECAAECAAECAA==")
> sc.hadoopConfiguration.set("parquet.encryption.key.material.store.internally","false")
> val encryptedParquetPath = "/tmp/par_enc_footer_non_encrypted"
> valpartitionCol = 1
> case class nestedItem(ic: Int = 0, sic : Double, pc: Int = 0)
> case class SquareItem(int_column: Int, square_int_column : Double, 
> partitionCol: Int, nestedCol :nestedItem)
> val dataRange = (1 to 100).toList
> val squares = sc.parallelize(dataRange.map(i => new SquareItem(i, 
> scala.math.pow(i,2), partitionCol,nestedItem(i,i
> squares.toDS().show()
> squares.toDS().write.partitionBy("partitionCol").mode("overwrite").option("parquet.encryption.column.keys",
>  
> "key1a:square_int_column,nestedCol.ic;").option("parquet.encryption.plaintext.footer",true).option("parquet.encryption.footer.key",
>  "keyz").parquet(encryptedParquetPath)
> {code}
> Code to read the data trying to access non encrypted nested field by opening 
> a new spark-shell
>  
> {code:java}
> val encryptedParquetPath = "/tmp/par_enc_footer_non_encrypted"
> spark.sqlContext.read.parquet(encryptedParquetPath).createOrReplaceTempView("test")
> spark.sql("select nestedCol.sic from test").show(){code}
> As you can see that nestedCol.sic is not encrypted , I was expecting the 
> results, but
> I get the below error
>  
> {code:java}
> Caused by: org.apache.parquet.crypto.ParquetCryptoRuntimeException: 
> [square_int_column]. Null File Decryptor
>   at 
> org.apache.parquet.hadoop.metadata.EncryptedColumnChunkMetaData.decryptIfNeeded(ColumnChunkMetaData.java:602)
>   at 
> org.apache.parquet.hadoop.metadata.ColumnChunkMetaData.getEncodings(ColumnChunkMetaData.java:348)
>   at 
> org.apache.parquet.hadoop.ParquetRecordReader.checkDeltaByteArrayProblem(ParquetRecordReader.java:191)
>   at 
> 

[jira] [Commented] (PARQUET-2276) ParquetReader reads do not work with Hadoop version 2.8.5

2023-05-02 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17718738#comment-17718738
 ] 

ASF GitHub Bot commented on PARQUET-2276:
-

Fokko commented on PR #1084:
URL: https://github.com/apache/parquet-mr/pull/1084#issuecomment-1532200533

   > I think this should work after comparing it with older code, but it seems 
like there are some easy improvements to me.
   
   @rdblue I agree, I did some cleaning up. Let me know what you think.




> ParquetReader reads do not work with Hadoop version 2.8.5
> -
>
> Key: PARQUET-2276
> URL: https://issues.apache.org/jira/browse/PARQUET-2276
> Project: Parquet
>  Issue Type: Bug
>  Components: parquet-mr
>Affects Versions: 1.13.0
>Reporter: Atul Mohan
>Assignee: Fokko Driesprong
>Priority: Major
> Fix For: 1.14.0, 1.13.1
>
>
> {{ParquetReader.read() fails with the following exception on parquet-mr 
> version 1.13.0 when using hadoop version 2.8.5:}}
> {code:java}
>  java.lang.NoSuchMethodError: 'boolean 
> org.apache.hadoop.fs.FSDataInputStream.hasCapability(java.lang.String)' 
> at 
> org.apache.parquet.hadoop.util.HadoopStreams.isWrappedStreamByteBufferReadable(HadoopStreams.java:74)
>  
> at org.apache.parquet.hadoop.util.HadoopStreams.wrap(HadoopStreams.java:49) 
> at 
> org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:69)
>  
> at 
> org.apache.parquet.hadoop.ParquetFileReader.(ParquetFileReader.java:787)
>  
> at 
> org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657) 
> at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:162) 
> org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
> {code}
>  
>  
>  
> From an initial investigation, it looks like HadoopStreams has started using 
> [FSDataInputStream.hasCapability|https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java#L74]
>  but _FSDataInputStream_ does not have the _hasCapability_ API in [hadoop 
> 2.8.x|https://hadoop.apache.org/docs/r2.8.3/api/org/apache/hadoop/fs/FSDataInputStream.html].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [parquet-mr] Fokko commented on pull request #1084: PARQUET-2276: Bring back support for Hadoop 2.7.3

2023-05-02 Thread via GitHub


Fokko commented on PR #1084:
URL: https://github.com/apache/parquet-mr/pull/1084#issuecomment-1532200533

   > I think this should work after comparing it with older code, but it seems 
like there are some easy improvements to me.
   
   @rdblue I agree, I did some cleaning up. Let me know what you think.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2276) ParquetReader reads do not work with Hadoop version 2.8.5

2023-05-02 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17718736#comment-17718736
 ] 

ASF GitHub Bot commented on PARQUET-2276:
-

Fokko commented on code in PR #1084:
URL: https://github.com/apache/parquet-mr/pull/1084#discussion_r1183080135


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java:
##
@@ -46,41 +83,91 @@ public class HadoopStreams {
*/
   public static SeekableInputStream wrap(FSDataInputStream stream) {
 Objects.requireNonNull(stream, "Cannot wrap a null input stream");
-if (isWrappedStreamByteBufferReadable(stream)) {
-  return new H2SeekableInputStream(stream);
-} else {
-  return new H1SeekableInputStream(stream);
+
+// Try to check using hasCapabilities(str)
+Boolean hasCapabilitiesResult = 
isWrappedStreamByteBufferReadableHasCapabilities(stream);
+
+// If it is null, then fall back to the old method
+if (hasCapabilitiesResult != null) {
+  if (hasCapabilitiesResult) {
+return new H2SeekableInputStream(stream);
+  } else {
+return new H1SeekableInputStream(stream);
+  }
+}
+
+return isWrappedStreamByteBufferReadableLegacy(stream);
+  }
+
+  /**
+   * Is the inner stream byte buffer readable?
+   * The test is 'the stream is not FSDataInputStream
+   * and implements ByteBufferReadable'
+   *
+   * This logic is only used for Hadoop <2.9.x, and <3.x.x
+   *
+   * @param stream stream to probe
+   * @return A H2SeekableInputStream to access, or H1SeekableInputStream if 
the stream is not seekable
+   */
+  private static SeekableInputStream 
isWrappedStreamByteBufferReadableLegacy(FSDataInputStream stream) {
+InputStream wrapped = stream.getWrappedStream();
+if (wrapped instanceof FSDataInputStream) {
+  LOG.debug("Checking on wrapped stream {} of {} whether is 
ByteBufferReadable", wrapped, stream);
+  return isWrappedStreamByteBufferReadableLegacy(((FSDataInputStream) 
wrapped));
+}
+if (byteBufferReadableClass != null && h2SeekableConstructor != null &&
+  byteBufferReadableClass.isInstance(stream.getWrappedStream())) {
+  try {
+return h2SeekableConstructor.newInstance(stream);
+  } catch (InstantiationException | IllegalAccessException e) {
+LOG.warn("Could not instantiate H2SeekableInputStream, falling back to 
byte array reads", e);
+  } catch (InvocationTargetException e) {
+throw new ParquetDecodingException(
+  "Could not instantiate H2SeekableInputStream", 
e.getTargetException());
+  }
 }
+return new H1SeekableInputStream(stream);
   }
 
   /**
* Is the inner stream byte buffer readable?
-   * The test is "the stream is not FSDataInputStream
+   * The test is 'the stream is not FSDataInputStream
* and implements ByteBufferReadable'
*
* That is: all streams which implement ByteBufferReadable
-   * other than FSDataInputStream successfuly support read(ByteBuffer).
-   * This is true for all filesytem clients the hadoop codebase.
+   * other than FSDataInputStream successfully support read(ByteBuffer).
+   * This is true for all filesystem clients the hadoop codebase.
*
* In hadoop 3.3.0+, the StreamCapabilities probe can be used to
* check this: only those streams which provide the read(ByteBuffer)
* semantics MAY return true for the probe "in:readbytebuffer";
* FSDataInputStream will pass the probe down to the underlying stream.
*
* @param stream stream to probe
-   * @return true if it is safe to a H2SeekableInputStream to access the data
+   * @return true if it is safe to a H2SeekableInputStream to access
+   * the data, null when it cannot be determined
*/
-  private static boolean isWrappedStreamByteBufferReadable(FSDataInputStream 
stream) {
-if (stream.hasCapability("in:readbytebuffer")) {
-  // stream is issuing the guarantee that it implements the
-  // API. Holds for all implementations in hadoop-*
-  // since Hadoop 3.3.0 (HDFS-14111).
-  return true;
+  private static Boolean 
isWrappedStreamByteBufferReadableHasCapabilities(FSDataInputStream stream) {
+Method methodHasCapabilities;
+try {
+  methodHasCapabilities = stream.getClass().getMethod("hasCapability", 
String.class);

Review Comment:
   Added it





> ParquetReader reads do not work with Hadoop version 2.8.5
> -
>
> Key: PARQUET-2276
> URL: https://issues.apache.org/jira/browse/PARQUET-2276
> Project: Parquet
>  Issue Type: Bug
>  Components: parquet-mr
>Affects Versions: 1.13.0
>Reporter: Atul Mohan
>Assignee: Fokko Driesprong
>Priority: Major
> Fix For: 1.14.0, 1.13.1
>
>
> {{ParquetReader.read() fails with the following exception on 

[jira] [Commented] (PARQUET-2276) ParquetReader reads do not work with Hadoop version 2.8.5

2023-05-02 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17718735#comment-17718735
 ] 

ASF GitHub Bot commented on PARQUET-2276:
-

Fokko commented on code in PR #1084:
URL: https://github.com/apache/parquet-mr/pull/1084#discussion_r1183079903


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java:
##
@@ -37,6 +41,39 @@ public class HadoopStreams {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HadoopStreams.class);
 
+  private static final Class byteBufferReadableClass = getReadableClass();
+  static final Constructor h2SeekableConstructor = 
getH2SeekableConstructor();

Review Comment:
   Yes, I copied most from the old code to avoid refactoring. I think we can 
greatly simplify it because it was still taking Hadoop1 into account. We still 
have to check if the wrapped stream is ByteBufferReadable: 
https://github.com/apache/hadoop/blob/release-2.4.1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java#L142-L148
   
   The `hasCapabilities does the same but in a more elegant way.





> ParquetReader reads do not work with Hadoop version 2.8.5
> -
>
> Key: PARQUET-2276
> URL: https://issues.apache.org/jira/browse/PARQUET-2276
> Project: Parquet
>  Issue Type: Bug
>  Components: parquet-mr
>Affects Versions: 1.13.0
>Reporter: Atul Mohan
>Assignee: Fokko Driesprong
>Priority: Major
> Fix For: 1.14.0, 1.13.1
>
>
> {{ParquetReader.read() fails with the following exception on parquet-mr 
> version 1.13.0 when using hadoop version 2.8.5:}}
> {code:java}
>  java.lang.NoSuchMethodError: 'boolean 
> org.apache.hadoop.fs.FSDataInputStream.hasCapability(java.lang.String)' 
> at 
> org.apache.parquet.hadoop.util.HadoopStreams.isWrappedStreamByteBufferReadable(HadoopStreams.java:74)
>  
> at org.apache.parquet.hadoop.util.HadoopStreams.wrap(HadoopStreams.java:49) 
> at 
> org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:69)
>  
> at 
> org.apache.parquet.hadoop.ParquetFileReader.(ParquetFileReader.java:787)
>  
> at 
> org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657) 
> at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:162) 
> org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
> {code}
>  
>  
>  
> From an initial investigation, it looks like HadoopStreams has started using 
> [FSDataInputStream.hasCapability|https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java#L74]
>  but _FSDataInputStream_ does not have the _hasCapability_ API in [hadoop 
> 2.8.x|https://hadoop.apache.org/docs/r2.8.3/api/org/apache/hadoop/fs/FSDataInputStream.html].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [parquet-mr] Fokko commented on a diff in pull request #1084: PARQUET-2276: Bring back support for Hadoop 2.7.3

2023-05-02 Thread via GitHub


Fokko commented on code in PR #1084:
URL: https://github.com/apache/parquet-mr/pull/1084#discussion_r1183080135


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java:
##
@@ -46,41 +83,91 @@ public class HadoopStreams {
*/
   public static SeekableInputStream wrap(FSDataInputStream stream) {
 Objects.requireNonNull(stream, "Cannot wrap a null input stream");
-if (isWrappedStreamByteBufferReadable(stream)) {
-  return new H2SeekableInputStream(stream);
-} else {
-  return new H1SeekableInputStream(stream);
+
+// Try to check using hasCapabilities(str)
+Boolean hasCapabilitiesResult = 
isWrappedStreamByteBufferReadableHasCapabilities(stream);
+
+// If it is null, then fall back to the old method
+if (hasCapabilitiesResult != null) {
+  if (hasCapabilitiesResult) {
+return new H2SeekableInputStream(stream);
+  } else {
+return new H1SeekableInputStream(stream);
+  }
+}
+
+return isWrappedStreamByteBufferReadableLegacy(stream);
+  }
+
+  /**
+   * Is the inner stream byte buffer readable?
+   * The test is 'the stream is not FSDataInputStream
+   * and implements ByteBufferReadable'
+   *
+   * This logic is only used for Hadoop <2.9.x, and <3.x.x
+   *
+   * @param stream stream to probe
+   * @return A H2SeekableInputStream to access, or H1SeekableInputStream if 
the stream is not seekable
+   */
+  private static SeekableInputStream 
isWrappedStreamByteBufferReadableLegacy(FSDataInputStream stream) {
+InputStream wrapped = stream.getWrappedStream();
+if (wrapped instanceof FSDataInputStream) {
+  LOG.debug("Checking on wrapped stream {} of {} whether is 
ByteBufferReadable", wrapped, stream);
+  return isWrappedStreamByteBufferReadableLegacy(((FSDataInputStream) 
wrapped));
+}
+if (byteBufferReadableClass != null && h2SeekableConstructor != null &&
+  byteBufferReadableClass.isInstance(stream.getWrappedStream())) {
+  try {
+return h2SeekableConstructor.newInstance(stream);
+  } catch (InstantiationException | IllegalAccessException e) {
+LOG.warn("Could not instantiate H2SeekableInputStream, falling back to 
byte array reads", e);
+  } catch (InvocationTargetException e) {
+throw new ParquetDecodingException(
+  "Could not instantiate H2SeekableInputStream", 
e.getTargetException());
+  }
 }
+return new H1SeekableInputStream(stream);
   }
 
   /**
* Is the inner stream byte buffer readable?
-   * The test is "the stream is not FSDataInputStream
+   * The test is 'the stream is not FSDataInputStream
* and implements ByteBufferReadable'
*
* That is: all streams which implement ByteBufferReadable
-   * other than FSDataInputStream successfuly support read(ByteBuffer).
-   * This is true for all filesytem clients the hadoop codebase.
+   * other than FSDataInputStream successfully support read(ByteBuffer).
+   * This is true for all filesystem clients the hadoop codebase.
*
* In hadoop 3.3.0+, the StreamCapabilities probe can be used to
* check this: only those streams which provide the read(ByteBuffer)
* semantics MAY return true for the probe "in:readbytebuffer";
* FSDataInputStream will pass the probe down to the underlying stream.
*
* @param stream stream to probe
-   * @return true if it is safe to a H2SeekableInputStream to access the data
+   * @return true if it is safe to a H2SeekableInputStream to access
+   * the data, null when it cannot be determined
*/
-  private static boolean isWrappedStreamByteBufferReadable(FSDataInputStream 
stream) {
-if (stream.hasCapability("in:readbytebuffer")) {
-  // stream is issuing the guarantee that it implements the
-  // API. Holds for all implementations in hadoop-*
-  // since Hadoop 3.3.0 (HDFS-14111).
-  return true;
+  private static Boolean 
isWrappedStreamByteBufferReadableHasCapabilities(FSDataInputStream stream) {
+Method methodHasCapabilities;
+try {
+  methodHasCapabilities = stream.getClass().getMethod("hasCapability", 
String.class);

Review Comment:
   Added it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [parquet-mr] Fokko commented on a diff in pull request #1084: PARQUET-2276: Bring back support for Hadoop 2.7.3

2023-05-02 Thread via GitHub


Fokko commented on code in PR #1084:
URL: https://github.com/apache/parquet-mr/pull/1084#discussion_r1183079903


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java:
##
@@ -37,6 +41,39 @@ public class HadoopStreams {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HadoopStreams.class);
 
+  private static final Class byteBufferReadableClass = getReadableClass();
+  static final Constructor h2SeekableConstructor = 
getH2SeekableConstructor();

Review Comment:
   Yes, I copied most from the old code to avoid refactoring. I think we can 
greatly simplify it because it was still taking Hadoop1 into account. We still 
have to check if the wrapped stream is ByteBufferReadable: 
https://github.com/apache/hadoop/blob/release-2.4.1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java#L142-L148
   
   The `hasCapabilities does the same but in a more elegant way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2276) ParquetReader reads do not work with Hadoop version 2.8.5

2023-05-02 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17718728#comment-17718728
 ] 

ASF GitHub Bot commented on PARQUET-2276:
-

Fokko commented on code in PR #1084:
URL: https://github.com/apache/parquet-mr/pull/1084#discussion_r1183066231


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java:
##
@@ -46,41 +83,91 @@ public class HadoopStreams {
*/
   public static SeekableInputStream wrap(FSDataInputStream stream) {
 Objects.requireNonNull(stream, "Cannot wrap a null input stream");
-if (isWrappedStreamByteBufferReadable(stream)) {
-  return new H2SeekableInputStream(stream);
-} else {
-  return new H1SeekableInputStream(stream);
+
+// Try to check using hasCapabilities(str)
+Boolean hasCapabilitiesResult = 
isWrappedStreamByteBufferReadableHasCapabilities(stream);
+
+// If it is null, then fall back to the old method
+if (hasCapabilitiesResult != null) {
+  if (hasCapabilitiesResult) {
+return new H2SeekableInputStream(stream);
+  } else {
+return new H1SeekableInputStream(stream);
+  }
+}
+
+return isWrappedStreamByteBufferReadableLegacy(stream);
+  }
+
+  /**
+   * Is the inner stream byte buffer readable?
+   * The test is 'the stream is not FSDataInputStream
+   * and implements ByteBufferReadable'
+   *
+   * This logic is only used for Hadoop <2.9.x, and <3.x.x
+   *
+   * @param stream stream to probe
+   * @return A H2SeekableInputStream to access, or H1SeekableInputStream if 
the stream is not seekable
+   */
+  private static SeekableInputStream 
isWrappedStreamByteBufferReadableLegacy(FSDataInputStream stream) {
+InputStream wrapped = stream.getWrappedStream();
+if (wrapped instanceof FSDataInputStream) {
+  LOG.debug("Checking on wrapped stream {} of {} whether is 
ByteBufferReadable", wrapped, stream);
+  return isWrappedStreamByteBufferReadableLegacy(((FSDataInputStream) 
wrapped));

Review Comment:
   This came from the issue from Presto: 
https://github.com/prestodb/presto/pull/17435





> ParquetReader reads do not work with Hadoop version 2.8.5
> -
>
> Key: PARQUET-2276
> URL: https://issues.apache.org/jira/browse/PARQUET-2276
> Project: Parquet
>  Issue Type: Bug
>  Components: parquet-mr
>Affects Versions: 1.13.0
>Reporter: Atul Mohan
>Assignee: Fokko Driesprong
>Priority: Major
> Fix For: 1.14.0, 1.13.1
>
>
> {{ParquetReader.read() fails with the following exception on parquet-mr 
> version 1.13.0 when using hadoop version 2.8.5:}}
> {code:java}
>  java.lang.NoSuchMethodError: 'boolean 
> org.apache.hadoop.fs.FSDataInputStream.hasCapability(java.lang.String)' 
> at 
> org.apache.parquet.hadoop.util.HadoopStreams.isWrappedStreamByteBufferReadable(HadoopStreams.java:74)
>  
> at org.apache.parquet.hadoop.util.HadoopStreams.wrap(HadoopStreams.java:49) 
> at 
> org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:69)
>  
> at 
> org.apache.parquet.hadoop.ParquetFileReader.(ParquetFileReader.java:787)
>  
> at 
> org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657) 
> at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:162) 
> org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
> {code}
>  
>  
>  
> From an initial investigation, it looks like HadoopStreams has started using 
> [FSDataInputStream.hasCapability|https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java#L74]
>  but _FSDataInputStream_ does not have the _hasCapability_ API in [hadoop 
> 2.8.x|https://hadoop.apache.org/docs/r2.8.3/api/org/apache/hadoop/fs/FSDataInputStream.html].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [parquet-mr] Fokko commented on a diff in pull request #1084: PARQUET-2276: Bring back support for Hadoop 2.7.3

2023-05-02 Thread via GitHub


Fokko commented on code in PR #1084:
URL: https://github.com/apache/parquet-mr/pull/1084#discussion_r1183066231


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java:
##
@@ -46,41 +83,91 @@ public class HadoopStreams {
*/
   public static SeekableInputStream wrap(FSDataInputStream stream) {
 Objects.requireNonNull(stream, "Cannot wrap a null input stream");
-if (isWrappedStreamByteBufferReadable(stream)) {
-  return new H2SeekableInputStream(stream);
-} else {
-  return new H1SeekableInputStream(stream);
+
+// Try to check using hasCapabilities(str)
+Boolean hasCapabilitiesResult = 
isWrappedStreamByteBufferReadableHasCapabilities(stream);
+
+// If it is null, then fall back to the old method
+if (hasCapabilitiesResult != null) {
+  if (hasCapabilitiesResult) {
+return new H2SeekableInputStream(stream);
+  } else {
+return new H1SeekableInputStream(stream);
+  }
+}
+
+return isWrappedStreamByteBufferReadableLegacy(stream);
+  }
+
+  /**
+   * Is the inner stream byte buffer readable?
+   * The test is 'the stream is not FSDataInputStream
+   * and implements ByteBufferReadable'
+   *
+   * This logic is only used for Hadoop <2.9.x, and <3.x.x
+   *
+   * @param stream stream to probe
+   * @return A H2SeekableInputStream to access, or H1SeekableInputStream if 
the stream is not seekable
+   */
+  private static SeekableInputStream 
isWrappedStreamByteBufferReadableLegacy(FSDataInputStream stream) {
+InputStream wrapped = stream.getWrappedStream();
+if (wrapped instanceof FSDataInputStream) {
+  LOG.debug("Checking on wrapped stream {} of {} whether is 
ByteBufferReadable", wrapped, stream);
+  return isWrappedStreamByteBufferReadableLegacy(((FSDataInputStream) 
wrapped));

Review Comment:
   This came from the issue from Presto: 
https://github.com/prestodb/presto/pull/17435



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2276) ParquetReader reads do not work with Hadoop version 2.8.5

2023-05-02 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17718727#comment-17718727
 ] 

ASF GitHub Bot commented on PARQUET-2276:
-

rdblue commented on code in PR #1084:
URL: https://github.com/apache/parquet-mr/pull/1084#discussion_r1183059127


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java:
##
@@ -46,41 +83,91 @@ public class HadoopStreams {
*/
   public static SeekableInputStream wrap(FSDataInputStream stream) {
 Objects.requireNonNull(stream, "Cannot wrap a null input stream");
-if (isWrappedStreamByteBufferReadable(stream)) {
-  return new H2SeekableInputStream(stream);
-} else {
-  return new H1SeekableInputStream(stream);
+
+// Try to check using hasCapabilities(str)
+Boolean hasCapabilitiesResult = 
isWrappedStreamByteBufferReadableHasCapabilities(stream);
+
+// If it is null, then fall back to the old method
+if (hasCapabilitiesResult != null) {
+  if (hasCapabilitiesResult) {
+return new H2SeekableInputStream(stream);
+  } else {
+return new H1SeekableInputStream(stream);
+  }
+}
+
+return isWrappedStreamByteBufferReadableLegacy(stream);
+  }
+
+  /**
+   * Is the inner stream byte buffer readable?
+   * The test is 'the stream is not FSDataInputStream
+   * and implements ByteBufferReadable'
+   *
+   * This logic is only used for Hadoop <2.9.x, and <3.x.x
+   *
+   * @param stream stream to probe
+   * @return A H2SeekableInputStream to access, or H1SeekableInputStream if 
the stream is not seekable
+   */
+  private static SeekableInputStream 
isWrappedStreamByteBufferReadableLegacy(FSDataInputStream stream) {
+InputStream wrapped = stream.getWrappedStream();
+if (wrapped instanceof FSDataInputStream) {
+  LOG.debug("Checking on wrapped stream {} of {} whether is 
ByteBufferReadable", wrapped, stream);
+  return isWrappedStreamByteBufferReadableLegacy(((FSDataInputStream) 
wrapped));

Review Comment:
   Why would a FSDataInputStream have another one inside?



##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java:
##
@@ -46,41 +83,91 @@ public class HadoopStreams {
*/
   public static SeekableInputStream wrap(FSDataInputStream stream) {
 Objects.requireNonNull(stream, "Cannot wrap a null input stream");
-if (isWrappedStreamByteBufferReadable(stream)) {
-  return new H2SeekableInputStream(stream);
-} else {
-  return new H1SeekableInputStream(stream);
+
+// Try to check using hasCapabilities(str)
+Boolean hasCapabilitiesResult = 
isWrappedStreamByteBufferReadableHasCapabilities(stream);
+
+// If it is null, then fall back to the old method
+if (hasCapabilitiesResult != null) {
+  if (hasCapabilitiesResult) {
+return new H2SeekableInputStream(stream);
+  } else {
+return new H1SeekableInputStream(stream);
+  }
+}
+
+return isWrappedStreamByteBufferReadableLegacy(stream);
+  }
+
+  /**
+   * Is the inner stream byte buffer readable?
+   * The test is 'the stream is not FSDataInputStream
+   * and implements ByteBufferReadable'
+   *
+   * This logic is only used for Hadoop <2.9.x, and <3.x.x
+   *
+   * @param stream stream to probe
+   * @return A H2SeekableInputStream to access, or H1SeekableInputStream if 
the stream is not seekable
+   */
+  private static SeekableInputStream 
isWrappedStreamByteBufferReadableLegacy(FSDataInputStream stream) {
+InputStream wrapped = stream.getWrappedStream();
+if (wrapped instanceof FSDataInputStream) {
+  LOG.debug("Checking on wrapped stream {} of {} whether is 
ByteBufferReadable", wrapped, stream);
+  return isWrappedStreamByteBufferReadableLegacy(((FSDataInputStream) 
wrapped));
+}
+if (byteBufferReadableClass != null && h2SeekableConstructor != null &&
+  byteBufferReadableClass.isInstance(stream.getWrappedStream())) {
+  try {
+return h2SeekableConstructor.newInstance(stream);
+  } catch (InstantiationException | IllegalAccessException e) {
+LOG.warn("Could not instantiate H2SeekableInputStream, falling back to 
byte array reads", e);
+  } catch (InvocationTargetException e) {
+throw new ParquetDecodingException(
+  "Could not instantiate H2SeekableInputStream", 
e.getTargetException());
+  }
 }
+return new H1SeekableInputStream(stream);
   }
 
   /**
* Is the inner stream byte buffer readable?
-   * The test is "the stream is not FSDataInputStream
+   * The test is 'the stream is not FSDataInputStream
* and implements ByteBufferReadable'
*
* That is: all streams which implement ByteBufferReadable
-   * other than FSDataInputStream successfuly support read(ByteBuffer).
-   * This is true for all filesytem clients the hadoop codebase.
+   * other than FSDataInputStream successfully 

[GitHub] [parquet-mr] rdblue commented on a diff in pull request #1084: PARQUET-2276: Bring back support for Hadoop 2.7.3

2023-05-02 Thread via GitHub


rdblue commented on code in PR #1084:
URL: https://github.com/apache/parquet-mr/pull/1084#discussion_r1183059127


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java:
##
@@ -46,41 +83,91 @@ public class HadoopStreams {
*/
   public static SeekableInputStream wrap(FSDataInputStream stream) {
 Objects.requireNonNull(stream, "Cannot wrap a null input stream");
-if (isWrappedStreamByteBufferReadable(stream)) {
-  return new H2SeekableInputStream(stream);
-} else {
-  return new H1SeekableInputStream(stream);
+
+// Try to check using hasCapabilities(str)
+Boolean hasCapabilitiesResult = 
isWrappedStreamByteBufferReadableHasCapabilities(stream);
+
+// If it is null, then fall back to the old method
+if (hasCapabilitiesResult != null) {
+  if (hasCapabilitiesResult) {
+return new H2SeekableInputStream(stream);
+  } else {
+return new H1SeekableInputStream(stream);
+  }
+}
+
+return isWrappedStreamByteBufferReadableLegacy(stream);
+  }
+
+  /**
+   * Is the inner stream byte buffer readable?
+   * The test is 'the stream is not FSDataInputStream
+   * and implements ByteBufferReadable'
+   *
+   * This logic is only used for Hadoop <2.9.x, and <3.x.x
+   *
+   * @param stream stream to probe
+   * @return A H2SeekableInputStream to access, or H1SeekableInputStream if 
the stream is not seekable
+   */
+  private static SeekableInputStream 
isWrappedStreamByteBufferReadableLegacy(FSDataInputStream stream) {
+InputStream wrapped = stream.getWrappedStream();
+if (wrapped instanceof FSDataInputStream) {
+  LOG.debug("Checking on wrapped stream {} of {} whether is 
ByteBufferReadable", wrapped, stream);
+  return isWrappedStreamByteBufferReadableLegacy(((FSDataInputStream) 
wrapped));

Review Comment:
   Why would a FSDataInputStream have another one inside?



##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java:
##
@@ -46,41 +83,91 @@ public class HadoopStreams {
*/
   public static SeekableInputStream wrap(FSDataInputStream stream) {
 Objects.requireNonNull(stream, "Cannot wrap a null input stream");
-if (isWrappedStreamByteBufferReadable(stream)) {
-  return new H2SeekableInputStream(stream);
-} else {
-  return new H1SeekableInputStream(stream);
+
+// Try to check using hasCapabilities(str)
+Boolean hasCapabilitiesResult = 
isWrappedStreamByteBufferReadableHasCapabilities(stream);
+
+// If it is null, then fall back to the old method
+if (hasCapabilitiesResult != null) {
+  if (hasCapabilitiesResult) {
+return new H2SeekableInputStream(stream);
+  } else {
+return new H1SeekableInputStream(stream);
+  }
+}
+
+return isWrappedStreamByteBufferReadableLegacy(stream);
+  }
+
+  /**
+   * Is the inner stream byte buffer readable?
+   * The test is 'the stream is not FSDataInputStream
+   * and implements ByteBufferReadable'
+   *
+   * This logic is only used for Hadoop <2.9.x, and <3.x.x
+   *
+   * @param stream stream to probe
+   * @return A H2SeekableInputStream to access, or H1SeekableInputStream if 
the stream is not seekable
+   */
+  private static SeekableInputStream 
isWrappedStreamByteBufferReadableLegacy(FSDataInputStream stream) {
+InputStream wrapped = stream.getWrappedStream();
+if (wrapped instanceof FSDataInputStream) {
+  LOG.debug("Checking on wrapped stream {} of {} whether is 
ByteBufferReadable", wrapped, stream);
+  return isWrappedStreamByteBufferReadableLegacy(((FSDataInputStream) 
wrapped));
+}
+if (byteBufferReadableClass != null && h2SeekableConstructor != null &&
+  byteBufferReadableClass.isInstance(stream.getWrappedStream())) {
+  try {
+return h2SeekableConstructor.newInstance(stream);
+  } catch (InstantiationException | IllegalAccessException e) {
+LOG.warn("Could not instantiate H2SeekableInputStream, falling back to 
byte array reads", e);
+  } catch (InvocationTargetException e) {
+throw new ParquetDecodingException(
+  "Could not instantiate H2SeekableInputStream", 
e.getTargetException());
+  }
 }
+return new H1SeekableInputStream(stream);
   }
 
   /**
* Is the inner stream byte buffer readable?
-   * The test is "the stream is not FSDataInputStream
+   * The test is 'the stream is not FSDataInputStream
* and implements ByteBufferReadable'
*
* That is: all streams which implement ByteBufferReadable
-   * other than FSDataInputStream successfuly support read(ByteBuffer).
-   * This is true for all filesytem clients the hadoop codebase.
+   * other than FSDataInputStream successfully support read(ByteBuffer).
+   * This is true for all filesystem clients the hadoop codebase.
*
* In hadoop 3.3.0+, the StreamCapabilities probe can be used to
* check this: only those streams which provide the read(ByteBuffer)
* 

[jira] [Commented] (PARQUET-2276) ParquetReader reads do not work with Hadoop version 2.8.5

2023-05-02 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17718726#comment-17718726
 ] 

ASF GitHub Bot commented on PARQUET-2276:
-

rdblue commented on code in PR #1084:
URL: https://github.com/apache/parquet-mr/pull/1084#discussion_r1183057530


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java:
##
@@ -37,6 +41,39 @@ public class HadoopStreams {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HadoopStreams.class);
 
+  private static final Class byteBufferReadableClass = getReadableClass();
+  static final Constructor h2SeekableConstructor = 
getH2SeekableConstructor();

Review Comment:
   Huh, I guess this was how it was before? Nevermind on the refactoring then.





> ParquetReader reads do not work with Hadoop version 2.8.5
> -
>
> Key: PARQUET-2276
> URL: https://issues.apache.org/jira/browse/PARQUET-2276
> Project: Parquet
>  Issue Type: Bug
>  Components: parquet-mr
>Affects Versions: 1.13.0
>Reporter: Atul Mohan
>Assignee: Fokko Driesprong
>Priority: Major
> Fix For: 1.14.0, 1.13.1
>
>
> {{ParquetReader.read() fails with the following exception on parquet-mr 
> version 1.13.0 when using hadoop version 2.8.5:}}
> {code:java}
>  java.lang.NoSuchMethodError: 'boolean 
> org.apache.hadoop.fs.FSDataInputStream.hasCapability(java.lang.String)' 
> at 
> org.apache.parquet.hadoop.util.HadoopStreams.isWrappedStreamByteBufferReadable(HadoopStreams.java:74)
>  
> at org.apache.parquet.hadoop.util.HadoopStreams.wrap(HadoopStreams.java:49) 
> at 
> org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:69)
>  
> at 
> org.apache.parquet.hadoop.ParquetFileReader.(ParquetFileReader.java:787)
>  
> at 
> org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657) 
> at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:162) 
> org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
> {code}
>  
>  
>  
> From an initial investigation, it looks like HadoopStreams has started using 
> [FSDataInputStream.hasCapability|https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java#L74]
>  but _FSDataInputStream_ does not have the _hasCapability_ API in [hadoop 
> 2.8.x|https://hadoop.apache.org/docs/r2.8.3/api/org/apache/hadoop/fs/FSDataInputStream.html].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [parquet-mr] rdblue commented on a diff in pull request #1084: PARQUET-2276: Bring back support for Hadoop 2.7.3

2023-05-02 Thread via GitHub


rdblue commented on code in PR #1084:
URL: https://github.com/apache/parquet-mr/pull/1084#discussion_r1183057530


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java:
##
@@ -37,6 +41,39 @@ public class HadoopStreams {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HadoopStreams.class);
 
+  private static final Class byteBufferReadableClass = getReadableClass();
+  static final Constructor h2SeekableConstructor = 
getH2SeekableConstructor();

Review Comment:
   Huh, I guess this was how it was before? Nevermind on the refactoring then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2292) Improve default SpecificRecord model selection for Avro{Write,Read}Support

2023-05-02 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17718694#comment-17718694
 ] 

ASF GitHub Bot commented on PARQUET-2292:
-

clairemcginty commented on code in PR #1078:
URL: https://github.com/apache/parquet-mr/pull/1078#discussion_r1182943149


##
parquet-avro/pom.xml:
##
@@ -105,6 +110,30 @@
   test-jar
   test
 
+
+  org.mockito
+  mockito-core
+  2.23.0

Review Comment:
   the declared `mockito.version` in the root `pom.xml`, `1.10.19`, is 
incompatible with Powermock 2.0.x but I can't downgrade Powermock to 1.x 
without the test throwing some scary objenesis errors, so I think Powermock 2.x 
is the lowest we can go.





> Improve default SpecificRecord model selection for Avro{Write,Read}Support
> --
>
> Key: PARQUET-2292
> URL: https://issues.apache.org/jira/browse/PARQUET-2292
> Project: Parquet
>  Issue Type: Improvement
>Reporter: Claire McGinty
>Assignee: Claire McGinty
>Priority: Major
>
> AvroWriteSupport/AvroReadSupport can improve the precision of their default 
> `model` selection. Currently they default to new 
> SpecificDataSupplier().get()[0]. This means that SpecificRecord classes that 
> contain logical types will fail out-of-the-box unless a specific 
> DATA_SUPPLIER is configured that contains logical type conversions.
> I think we can improve this and make logical types work by default by 
> defaulting to the value of the `MODEL$` field that every SpecificRecordBase 
> implementation contains, which already contains all the logical conversions 
> for that Avro type. It would require reflection, but that's what the Avro 
> library is already doing to fetch models for Specific types[1].
>  
> [0] 
> [https://github.com/apache/parquet-mr/blob/d38044f5395494e1543581a4b763f624305d3022/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java#L403-L407]
> [1] 
> https://github.com/apache/avro/blob/release-1.11.1/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java#L76-L86



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [parquet-mr] clairemcginty commented on a diff in pull request #1078: PARQUET-2292: Default SpecificRecord model reflects from MODEL$ field

2023-05-02 Thread via GitHub


clairemcginty commented on code in PR #1078:
URL: https://github.com/apache/parquet-mr/pull/1078#discussion_r1182943149


##
parquet-avro/pom.xml:
##
@@ -105,6 +110,30 @@
   test-jar
   test
 
+
+  org.mockito
+  mockito-core
+  2.23.0

Review Comment:
   the declared `mockito.version` in the root `pom.xml`, `1.10.19`, is 
incompatible with Powermock 2.0.x but I can't downgrade Powermock to 1.x 
without the test throwing some scary objenesis errors, so I think Powermock 2.x 
is the lowest we can go.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2276) ParquetReader reads do not work with Hadoop version 2.8.5

2023-05-02 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17718680#comment-17718680
 ] 

ASF GitHub Bot commented on PARQUET-2276:
-

rdblue commented on code in PR #1084:
URL: https://github.com/apache/parquet-mr/pull/1084#discussion_r1182873931


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java:
##
@@ -46,41 +83,91 @@ public class HadoopStreams {
*/
   public static SeekableInputStream wrap(FSDataInputStream stream) {
 Objects.requireNonNull(stream, "Cannot wrap a null input stream");
-if (isWrappedStreamByteBufferReadable(stream)) {
-  return new H2SeekableInputStream(stream);
-} else {
-  return new H1SeekableInputStream(stream);
+
+// Try to check using hasCapabilities(str)
+Boolean hasCapabilitiesResult = 
isWrappedStreamByteBufferReadableHasCapabilities(stream);
+
+// If it is null, then fall back to the old method
+if (hasCapabilitiesResult != null) {
+  if (hasCapabilitiesResult) {
+return new H2SeekableInputStream(stream);
+  } else {
+return new H1SeekableInputStream(stream);
+  }
+}
+
+return isWrappedStreamByteBufferReadableLegacy(stream);
+  }
+
+  /**
+   * Is the inner stream byte buffer readable?
+   * The test is 'the stream is not FSDataInputStream
+   * and implements ByteBufferReadable'
+   *
+   * This logic is only used for Hadoop <2.9.x, and <3.x.x
+   *
+   * @param stream stream to probe
+   * @return A H2SeekableInputStream to access, or H1SeekableInputStream if 
the stream is not seekable
+   */
+  private static SeekableInputStream 
isWrappedStreamByteBufferReadableLegacy(FSDataInputStream stream) {
+InputStream wrapped = stream.getWrappedStream();
+if (wrapped instanceof FSDataInputStream) {
+  LOG.debug("Checking on wrapped stream {} of {} whether is 
ByteBufferReadable", wrapped, stream);
+  return isWrappedStreamByteBufferReadableLegacy(((FSDataInputStream) 
wrapped));
+}
+if (byteBufferReadableClass != null && h2SeekableConstructor != null &&
+  byteBufferReadableClass.isInstance(stream.getWrappedStream())) {
+  try {
+return h2SeekableConstructor.newInstance(stream);
+  } catch (InstantiationException | IllegalAccessException e) {
+LOG.warn("Could not instantiate H2SeekableInputStream, falling back to 
byte array reads", e);
+  } catch (InvocationTargetException e) {
+throw new ParquetDecodingException(
+  "Could not instantiate H2SeekableInputStream", 
e.getTargetException());
+  }
 }
+return new H1SeekableInputStream(stream);
   }
 
   /**
* Is the inner stream byte buffer readable?
-   * The test is "the stream is not FSDataInputStream
+   * The test is 'the stream is not FSDataInputStream
* and implements ByteBufferReadable'
*
* That is: all streams which implement ByteBufferReadable
-   * other than FSDataInputStream successfuly support read(ByteBuffer).
-   * This is true for all filesytem clients the hadoop codebase.
+   * other than FSDataInputStream successfully support read(ByteBuffer).
+   * This is true for all filesystem clients the hadoop codebase.
*
* In hadoop 3.3.0+, the StreamCapabilities probe can be used to
* check this: only those streams which provide the read(ByteBuffer)
* semantics MAY return true for the probe "in:readbytebuffer";
* FSDataInputStream will pass the probe down to the underlying stream.
*
* @param stream stream to probe
-   * @return true if it is safe to a H2SeekableInputStream to access the data
+   * @return true if it is safe to a H2SeekableInputStream to access
+   * the data, null when it cannot be determined
*/
-  private static boolean isWrappedStreamByteBufferReadable(FSDataInputStream 
stream) {
-if (stream.hasCapability("in:readbytebuffer")) {
-  // stream is issuing the guarantee that it implements the
-  // API. Holds for all implementations in hadoop-*
-  // since Hadoop 3.3.0 (HDFS-14111).
-  return true;
+  private static Boolean 
isWrappedStreamByteBufferReadableHasCapabilities(FSDataInputStream stream) {
+Method methodHasCapabilities;
+try {
+  methodHasCapabilities = stream.getClass().getMethod("hasCapability", 
String.class);

Review Comment:
   You can use DynMethods to get an unbound `hasCapability` method. That can be 
done statically, so all you need to do is check whether it is present and call 
it.





> ParquetReader reads do not work with Hadoop version 2.8.5
> -
>
> Key: PARQUET-2276
> URL: https://issues.apache.org/jira/browse/PARQUET-2276
> Project: Parquet
>  Issue Type: Bug
>  Components: parquet-mr
>Affects Versions: 1.13.0
>Reporter: Atul Mohan
>Assignee: 

[jira] [Commented] (PARQUET-2276) ParquetReader reads do not work with Hadoop version 2.8.5

2023-05-02 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17718679#comment-17718679
 ] 

ASF GitHub Bot commented on PARQUET-2276:
-

rdblue commented on code in PR #1084:
URL: https://github.com/apache/parquet-mr/pull/1084#discussion_r1182872918


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java:
##
@@ -37,6 +41,39 @@ public class HadoopStreams {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HadoopStreams.class);
 
+  private static final Class byteBufferReadableClass = getReadableClass();
+  static final Constructor h2SeekableConstructor = 
getH2SeekableConstructor();

Review Comment:
   Looks like you'd need to add the `orNull` handling. I don't see it here: 
https://github.com/apache/parquet-mr/blob/master/parquet-common/src/main/java/org/apache/parquet/util/DynConstructors.java





> ParquetReader reads do not work with Hadoop version 2.8.5
> -
>
> Key: PARQUET-2276
> URL: https://issues.apache.org/jira/browse/PARQUET-2276
> Project: Parquet
>  Issue Type: Bug
>  Components: parquet-mr
>Affects Versions: 1.13.0
>Reporter: Atul Mohan
>Assignee: Fokko Driesprong
>Priority: Major
> Fix For: 1.14.0, 1.13.1
>
>
> {{ParquetReader.read() fails with the following exception on parquet-mr 
> version 1.13.0 when using hadoop version 2.8.5:}}
> {code:java}
>  java.lang.NoSuchMethodError: 'boolean 
> org.apache.hadoop.fs.FSDataInputStream.hasCapability(java.lang.String)' 
> at 
> org.apache.parquet.hadoop.util.HadoopStreams.isWrappedStreamByteBufferReadable(HadoopStreams.java:74)
>  
> at org.apache.parquet.hadoop.util.HadoopStreams.wrap(HadoopStreams.java:49) 
> at 
> org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:69)
>  
> at 
> org.apache.parquet.hadoop.ParquetFileReader.(ParquetFileReader.java:787)
>  
> at 
> org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657) 
> at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:162) 
> org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
> {code}
>  
>  
>  
> From an initial investigation, it looks like HadoopStreams has started using 
> [FSDataInputStream.hasCapability|https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java#L74]
>  but _FSDataInputStream_ does not have the _hasCapability_ API in [hadoop 
> 2.8.x|https://hadoop.apache.org/docs/r2.8.3/api/org/apache/hadoop/fs/FSDataInputStream.html].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [parquet-mr] rdblue commented on a diff in pull request #1084: PARQUET-2276: Bring back support for Hadoop 2.7.3

2023-05-02 Thread via GitHub


rdblue commented on code in PR #1084:
URL: https://github.com/apache/parquet-mr/pull/1084#discussion_r1182873931


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java:
##
@@ -46,41 +83,91 @@ public class HadoopStreams {
*/
   public static SeekableInputStream wrap(FSDataInputStream stream) {
 Objects.requireNonNull(stream, "Cannot wrap a null input stream");
-if (isWrappedStreamByteBufferReadable(stream)) {
-  return new H2SeekableInputStream(stream);
-} else {
-  return new H1SeekableInputStream(stream);
+
+// Try to check using hasCapabilities(str)
+Boolean hasCapabilitiesResult = 
isWrappedStreamByteBufferReadableHasCapabilities(stream);
+
+// If it is null, then fall back to the old method
+if (hasCapabilitiesResult != null) {
+  if (hasCapabilitiesResult) {
+return new H2SeekableInputStream(stream);
+  } else {
+return new H1SeekableInputStream(stream);
+  }
+}
+
+return isWrappedStreamByteBufferReadableLegacy(stream);
+  }
+
+  /**
+   * Is the inner stream byte buffer readable?
+   * The test is 'the stream is not FSDataInputStream
+   * and implements ByteBufferReadable'
+   *
+   * This logic is only used for Hadoop <2.9.x, and <3.x.x
+   *
+   * @param stream stream to probe
+   * @return A H2SeekableInputStream to access, or H1SeekableInputStream if 
the stream is not seekable
+   */
+  private static SeekableInputStream 
isWrappedStreamByteBufferReadableLegacy(FSDataInputStream stream) {
+InputStream wrapped = stream.getWrappedStream();
+if (wrapped instanceof FSDataInputStream) {
+  LOG.debug("Checking on wrapped stream {} of {} whether is 
ByteBufferReadable", wrapped, stream);
+  return isWrappedStreamByteBufferReadableLegacy(((FSDataInputStream) 
wrapped));
+}
+if (byteBufferReadableClass != null && h2SeekableConstructor != null &&
+  byteBufferReadableClass.isInstance(stream.getWrappedStream())) {
+  try {
+return h2SeekableConstructor.newInstance(stream);
+  } catch (InstantiationException | IllegalAccessException e) {
+LOG.warn("Could not instantiate H2SeekableInputStream, falling back to 
byte array reads", e);
+  } catch (InvocationTargetException e) {
+throw new ParquetDecodingException(
+  "Could not instantiate H2SeekableInputStream", 
e.getTargetException());
+  }
 }
+return new H1SeekableInputStream(stream);
   }
 
   /**
* Is the inner stream byte buffer readable?
-   * The test is "the stream is not FSDataInputStream
+   * The test is 'the stream is not FSDataInputStream
* and implements ByteBufferReadable'
*
* That is: all streams which implement ByteBufferReadable
-   * other than FSDataInputStream successfuly support read(ByteBuffer).
-   * This is true for all filesytem clients the hadoop codebase.
+   * other than FSDataInputStream successfully support read(ByteBuffer).
+   * This is true for all filesystem clients the hadoop codebase.
*
* In hadoop 3.3.0+, the StreamCapabilities probe can be used to
* check this: only those streams which provide the read(ByteBuffer)
* semantics MAY return true for the probe "in:readbytebuffer";
* FSDataInputStream will pass the probe down to the underlying stream.
*
* @param stream stream to probe
-   * @return true if it is safe to a H2SeekableInputStream to access the data
+   * @return true if it is safe to a H2SeekableInputStream to access
+   * the data, null when it cannot be determined
*/
-  private static boolean isWrappedStreamByteBufferReadable(FSDataInputStream 
stream) {
-if (stream.hasCapability("in:readbytebuffer")) {
-  // stream is issuing the guarantee that it implements the
-  // API. Holds for all implementations in hadoop-*
-  // since Hadoop 3.3.0 (HDFS-14111).
-  return true;
+  private static Boolean 
isWrappedStreamByteBufferReadableHasCapabilities(FSDataInputStream stream) {
+Method methodHasCapabilities;
+try {
+  methodHasCapabilities = stream.getClass().getMethod("hasCapability", 
String.class);

Review Comment:
   You can use DynMethods to get an unbound `hasCapability` method. That can be 
done statically, so all you need to do is check whether it is present and call 
it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [parquet-mr] rdblue commented on a diff in pull request #1084: PARQUET-2276: Bring back support for Hadoop 2.7.3

2023-05-02 Thread via GitHub


rdblue commented on code in PR #1084:
URL: https://github.com/apache/parquet-mr/pull/1084#discussion_r1182872918


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java:
##
@@ -37,6 +41,39 @@ public class HadoopStreams {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HadoopStreams.class);
 
+  private static final Class byteBufferReadableClass = getReadableClass();
+  static final Constructor h2SeekableConstructor = 
getH2SeekableConstructor();

Review Comment:
   Looks like you'd need to add the `orNull` handling. I don't see it here: 
https://github.com/apache/parquet-mr/blob/master/parquet-common/src/main/java/org/apache/parquet/util/DynConstructors.java



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2292) Improve default SpecificRecord model selection for Avro{Write,Read}Support

2023-05-02 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17718678#comment-17718678
 ] 

ASF GitHub Bot commented on PARQUET-2292:
-

clairemcginty commented on code in PR #1078:
URL: https://github.com/apache/parquet-mr/pull/1078#discussion_r1182871206


##
parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java:
##
@@ -169,6 +172,46 @@ public void add(Object value) {
 }
   }
 
+  /**
+   * Returns the specific data model for a given SpecificRecord schema by 
reflecting the underlying
+   * Avro class's `MODEL$` field, or Null if the class is not on the classpath 
or reflection fails.
+   */
+  static SpecificData getModelForSchema(Schema schema) {
+final Class clazz;
+
+if (schema != null && (schema.getType() == Schema.Type.RECORD || 
schema.getType() == Schema.Type.UNION)) {
+  clazz = SpecificData.get().getClass(schema);
+} else {
+  return null;
+}
+
+final SpecificData model;
+try {
+  final Field modelField = clazz.getDeclaredField("MODEL$");
+  modelField.setAccessible(true);
+
+  model = (SpecificData) modelField.get(null);
+} catch (Exception e) {
+  return null;
+}
+
+try {
+  final String avroVersion = 
Schema.Parser.class.getPackage().getImplementationVersion();
+  // Avro 1.8 doesn't include conversions in the MODEL$ field
+  if (avroVersion.startsWith("1.8.")) {

Review Comment:
   > Since we are using reflections on private members there are no 
compatibility guarantees. We shall be very careful here. What about avro 
versions prior to 1.8? Also, what if it breaks in the future? Will the related 
unit test fail for a future Avro releases (in case of upgrading the Avro 
version in the pom)?
   
   so I've tested 1.7 and 1.8; since 1.9 Avro has stably used the `MODEL$` 
field to hold all conversions, so I feel reasonably confident about relying on 
this. If that changes, we'll catch it in the new unit tests  
   
   If you want, I can surround invocations of `getModelForSchema` in a 
try/catch (in `AvroReadSupport`/`AvroWriteSupport`), and just use the default 
SpecificDataSupplier if they throw anything. That way any unexpected behavior 
would just result in logical types not being used.





> Improve default SpecificRecord model selection for Avro{Write,Read}Support
> --
>
> Key: PARQUET-2292
> URL: https://issues.apache.org/jira/browse/PARQUET-2292
> Project: Parquet
>  Issue Type: Improvement
>Reporter: Claire McGinty
>Assignee: Claire McGinty
>Priority: Major
>
> AvroWriteSupport/AvroReadSupport can improve the precision of their default 
> `model` selection. Currently they default to new 
> SpecificDataSupplier().get()[0]. This means that SpecificRecord classes that 
> contain logical types will fail out-of-the-box unless a specific 
> DATA_SUPPLIER is configured that contains logical type conversions.
> I think we can improve this and make logical types work by default by 
> defaulting to the value of the `MODEL$` field that every SpecificRecordBase 
> implementation contains, which already contains all the logical conversions 
> for that Avro type. It would require reflection, but that's what the Avro 
> library is already doing to fetch models for Specific types[1].
>  
> [0] 
> [https://github.com/apache/parquet-mr/blob/d38044f5395494e1543581a4b763f624305d3022/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java#L403-L407]
> [1] 
> https://github.com/apache/avro/blob/release-1.11.1/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java#L76-L86



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [parquet-mr] clairemcginty commented on a diff in pull request #1078: PARQUET-2292: Default SpecificRecord model reflects from MODEL$ field

2023-05-02 Thread via GitHub


clairemcginty commented on code in PR #1078:
URL: https://github.com/apache/parquet-mr/pull/1078#discussion_r1182871206


##
parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java:
##
@@ -169,6 +172,46 @@ public void add(Object value) {
 }
   }
 
+  /**
+   * Returns the specific data model for a given SpecificRecord schema by 
reflecting the underlying
+   * Avro class's `MODEL$` field, or Null if the class is not on the classpath 
or reflection fails.
+   */
+  static SpecificData getModelForSchema(Schema schema) {
+final Class clazz;
+
+if (schema != null && (schema.getType() == Schema.Type.RECORD || 
schema.getType() == Schema.Type.UNION)) {
+  clazz = SpecificData.get().getClass(schema);
+} else {
+  return null;
+}
+
+final SpecificData model;
+try {
+  final Field modelField = clazz.getDeclaredField("MODEL$");
+  modelField.setAccessible(true);
+
+  model = (SpecificData) modelField.get(null);
+} catch (Exception e) {
+  return null;
+}
+
+try {
+  final String avroVersion = 
Schema.Parser.class.getPackage().getImplementationVersion();
+  // Avro 1.8 doesn't include conversions in the MODEL$ field
+  if (avroVersion.startsWith("1.8.")) {

Review Comment:
   > Since we are using reflections on private members there are no 
compatibility guarantees. We shall be very careful here. What about avro 
versions prior to 1.8? Also, what if it breaks in the future? Will the related 
unit test fail for a future Avro releases (in case of upgrading the Avro 
version in the pom)?
   
   so I've tested 1.7 and 1.8; since 1.9 Avro has stably used the `MODEL$` 
field to hold all conversions, so I feel reasonably confident about relying on 
this. If that changes, we'll catch it in the new unit tests  
   
   If you want, I can surround invocations of `getModelForSchema` in a 
try/catch (in `AvroReadSupport`/`AvroWriteSupport`), and just use the default 
SpecificDataSupplier if they throw anything. That way any unexpected behavior 
would just result in logical types not being used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2292) Improve default SpecificRecord model selection for Avro{Write,Read}Support

2023-05-02 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17718675#comment-17718675
 ] 

ASF GitHub Bot commented on PARQUET-2292:
-

clairemcginty commented on code in PR #1078:
URL: https://github.com/apache/parquet-mr/pull/1078#discussion_r1182865579


##
parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java:
##
@@ -169,6 +172,46 @@ public void add(Object value) {
 }
   }
 
+  /**
+   * Returns the specific data model for a given SpecificRecord schema by 
reflecting the underlying
+   * Avro class's `MODEL$` field, or Null if the class is not on the classpath 
or reflection fails.
+   */
+  static SpecificData getModelForSchema(Schema schema) {
+final Class clazz;
+
+if (schema != null && (schema.getType() == Schema.Type.RECORD || 
schema.getType() == Schema.Type.UNION)) {
+  clazz = SpecificData.get().getClass(schema);
+} else {
+  return null;
+}
+
+final SpecificData model;
+try {
+  final Field modelField = clazz.getDeclaredField("MODEL$");
+  modelField.setAccessible(true);
+
+  model = (SpecificData) modelField.get(null);
+} catch (Exception e) {
+  return null;
+}
+
+try {
+  final String avroVersion = 
Schema.Parser.class.getPackage().getImplementationVersion();
+  // Avro 1.8 doesn't include conversions in the MODEL$ field
+  if (avroVersion.startsWith("1.8.")) {

Review Comment:
   ok, I've added a new test suite, `TestAvroRecordConverter`, that thoroughly 
tests `getModelForSchema` with a variety of schema variants and with Avro 
versions 1.{7,8,9,10,11}. To do this, I used Powermock to mock the static 
invocation used to get Avro runtime version, then I ad-hoc compiled Avro 
generated classes using compiler versions 1.{7,8,9,10,11}. Happy to take a 
different approach with testing if you prefer 

> Improve default SpecificRecord model selection for Avro{Write,Read}Support
> --
>
> Key: PARQUET-2292
> URL: https://issues.apache.org/jira/browse/PARQUET-2292
> Project: Parquet
>  Issue Type: Improvement
>Reporter: Claire McGinty
>Assignee: Claire McGinty
>Priority: Major
>
> AvroWriteSupport/AvroReadSupport can improve the precision of their default 
> `model` selection. Currently they default to new 
> SpecificDataSupplier().get()[0]. This means that SpecificRecord classes that 
> contain logical types will fail out-of-the-box unless a specific 
> DATA_SUPPLIER is configured that contains logical type conversions.
> I think we can improve this and make logical types work by default by 
> defaulting to the value of the `MODEL$` field that every SpecificRecordBase 
> implementation contains, which already contains all the logical conversions 
> for that Avro type. It would require reflection, but that's what the Avro 
> library is already doing to fetch models for Specific types[1].
>  
> [0] 
> [https://github.com/apache/parquet-mr/blob/d38044f5395494e1543581a4b763f624305d3022/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java#L403-L407]
> [1] 
> https://github.com/apache/avro/blob/release-1.11.1/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java#L76-L86



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-2292) Improve default SpecificRecord model selection for Avro{Write,Read}Support

2023-05-02 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17718676#comment-17718676
 ] 

ASF GitHub Bot commented on PARQUET-2292:
-

clairemcginty commented on code in PR #1078:
URL: https://github.com/apache/parquet-mr/pull/1078#discussion_r1182866610


##
parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java:
##
@@ -169,6 +172,46 @@ public void add(Object value) {
 }
   }
 
+  /**
+   * Returns the specific data model for a given SpecificRecord schema by 
reflecting the underlying
+   * Avro class's `MODEL$` field, or Null if the class is not on the classpath 
or reflection fails.
+   */
+  static SpecificData getModelForSchema(Schema schema) {
+final Class clazz;
+
+if (schema != null && (schema.getType() == Schema.Type.RECORD || 
schema.getType() == Schema.Type.UNION)) {
+  clazz = SpecificData.get().getClass(schema);
+} else {
+  return null;
+}
+
+final SpecificData model;
+try {
+  final Field modelField = clazz.getDeclaredField("MODEL$");
+  modelField.setAccessible(true);
+
+  model = (SpecificData) modelField.get(null);
+} catch (Exception e) {

Review Comment:
   Added clearer logging & null-/error-handling.





> Improve default SpecificRecord model selection for Avro{Write,Read}Support
> --
>
> Key: PARQUET-2292
> URL: https://issues.apache.org/jira/browse/PARQUET-2292
> Project: Parquet
>  Issue Type: Improvement
>Reporter: Claire McGinty
>Assignee: Claire McGinty
>Priority: Major
>
> AvroWriteSupport/AvroReadSupport can improve the precision of their default 
> `model` selection. Currently they default to new 
> SpecificDataSupplier().get()[0]. This means that SpecificRecord classes that 
> contain logical types will fail out-of-the-box unless a specific 
> DATA_SUPPLIER is configured that contains logical type conversions.
> I think we can improve this and make logical types work by default by 
> defaulting to the value of the `MODEL$` field that every SpecificRecordBase 
> implementation contains, which already contains all the logical conversions 
> for that Avro type. It would require reflection, but that's what the Avro 
> library is already doing to fetch models for Specific types[1].
>  
> [0] 
> [https://github.com/apache/parquet-mr/blob/d38044f5395494e1543581a4b763f624305d3022/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java#L403-L407]
> [1] 
> https://github.com/apache/avro/blob/release-1.11.1/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java#L76-L86



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [parquet-mr] clairemcginty commented on a diff in pull request #1078: PARQUET-2292: Default SpecificRecord model reflects from MODEL$ field

2023-05-02 Thread via GitHub


clairemcginty commented on code in PR #1078:
URL: https://github.com/apache/parquet-mr/pull/1078#discussion_r1182866610


##
parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java:
##
@@ -169,6 +172,46 @@ public void add(Object value) {
 }
   }
 
+  /**
+   * Returns the specific data model for a given SpecificRecord schema by 
reflecting the underlying
+   * Avro class's `MODEL$` field, or Null if the class is not on the classpath 
or reflection fails.
+   */
+  static SpecificData getModelForSchema(Schema schema) {
+final Class clazz;
+
+if (schema != null && (schema.getType() == Schema.Type.RECORD || 
schema.getType() == Schema.Type.UNION)) {
+  clazz = SpecificData.get().getClass(schema);
+} else {
+  return null;
+}
+
+final SpecificData model;
+try {
+  final Field modelField = clazz.getDeclaredField("MODEL$");
+  modelField.setAccessible(true);
+
+  model = (SpecificData) modelField.get(null);
+} catch (Exception e) {

Review Comment:
   Added clearer logging & null-/error-handling.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [parquet-mr] clairemcginty commented on a diff in pull request #1078: PARQUET-2292: Default SpecificRecord model reflects from MODEL$ field

2023-05-02 Thread via GitHub


clairemcginty commented on code in PR #1078:
URL: https://github.com/apache/parquet-mr/pull/1078#discussion_r1182865579


##
parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java:
##
@@ -169,6 +172,46 @@ public void add(Object value) {
 }
   }
 
+  /**
+   * Returns the specific data model for a given SpecificRecord schema by 
reflecting the underlying
+   * Avro class's `MODEL$` field, or Null if the class is not on the classpath 
or reflection fails.
+   */
+  static SpecificData getModelForSchema(Schema schema) {
+final Class clazz;
+
+if (schema != null && (schema.getType() == Schema.Type.RECORD || 
schema.getType() == Schema.Type.UNION)) {
+  clazz = SpecificData.get().getClass(schema);
+} else {
+  return null;
+}
+
+final SpecificData model;
+try {
+  final Field modelField = clazz.getDeclaredField("MODEL$");
+  modelField.setAccessible(true);
+
+  model = (SpecificData) modelField.get(null);
+} catch (Exception e) {
+  return null;
+}
+
+try {
+  final String avroVersion = 
Schema.Parser.class.getPackage().getImplementationVersion();
+  // Avro 1.8 doesn't include conversions in the MODEL$ field
+  if (avroVersion.startsWith("1.8.")) {

Review Comment:
   ok, I've added a new test suite, `TestAvroRecordConverter`, that thoroughly 
tests `getModelForSchema` with a variety of schema variants and with Avro 
versions 1.{7,8,9,10,11}. To do this, I used Powermock to mock the static 
invocation used to get Avro runtime version, then I ad-hoc compiled Avro 
generated classes using compiler versions 1.{7,8,9,10,11}. Happy to take a 
different approach with testing if you prefer -- this just seemed like the most 
straightforward way to do it :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2276) ParquetReader reads do not work with Hadoop version 2.8.5

2023-05-02 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17718672#comment-17718672
 ] 

ASF GitHub Bot commented on PARQUET-2276:
-

rdblue commented on code in PR #1084:
URL: https://github.com/apache/parquet-mr/pull/1084#discussion_r1182862929


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java:
##
@@ -37,6 +41,39 @@ public class HadoopStreams {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HadoopStreams.class);
 
+  private static final Class byteBufferReadableClass = getReadableClass();
+  static final Constructor h2SeekableConstructor = 
getH2SeekableConstructor();

Review Comment:
   Rather than using two static methods, you can use `DynConstructors` instead 
to make this one expression and reduce error handling code:
   
   ```java
   private static final DynConstructors.Ctor h2streamCtor =
   DynConstructors.Builder(SeekableInputStream.class)
   .impl("org.apache.parquet.hadoop.util.H2SeekableInputStream", 
FSDataInputStream.class)
   .orNull()
   .build()
   
   ...
   if (h2streamCtor != null) {
 return h2streamCtor.newInstance(stream);
   }
   ```





> ParquetReader reads do not work with Hadoop version 2.8.5
> -
>
> Key: PARQUET-2276
> URL: https://issues.apache.org/jira/browse/PARQUET-2276
> Project: Parquet
>  Issue Type: Bug
>  Components: parquet-mr
>Affects Versions: 1.13.0
>Reporter: Atul Mohan
>Assignee: Fokko Driesprong
>Priority: Major
> Fix For: 1.14.0, 1.13.1
>
>
> {{ParquetReader.read() fails with the following exception on parquet-mr 
> version 1.13.0 when using hadoop version 2.8.5:}}
> {code:java}
>  java.lang.NoSuchMethodError: 'boolean 
> org.apache.hadoop.fs.FSDataInputStream.hasCapability(java.lang.String)' 
> at 
> org.apache.parquet.hadoop.util.HadoopStreams.isWrappedStreamByteBufferReadable(HadoopStreams.java:74)
>  
> at org.apache.parquet.hadoop.util.HadoopStreams.wrap(HadoopStreams.java:49) 
> at 
> org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:69)
>  
> at 
> org.apache.parquet.hadoop.ParquetFileReader.(ParquetFileReader.java:787)
>  
> at 
> org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657) 
> at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:162) 
> org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
> {code}
>  
>  
>  
> From an initial investigation, it looks like HadoopStreams has started using 
> [FSDataInputStream.hasCapability|https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java#L74]
>  but _FSDataInputStream_ does not have the _hasCapability_ API in [hadoop 
> 2.8.x|https://hadoop.apache.org/docs/r2.8.3/api/org/apache/hadoop/fs/FSDataInputStream.html].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [parquet-mr] rdblue commented on a diff in pull request #1084: PARQUET-2276: Bring back support for Hadoop 2.7.3

2023-05-02 Thread via GitHub


rdblue commented on code in PR #1084:
URL: https://github.com/apache/parquet-mr/pull/1084#discussion_r1182862929


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java:
##
@@ -37,6 +41,39 @@ public class HadoopStreams {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HadoopStreams.class);
 
+  private static final Class byteBufferReadableClass = getReadableClass();
+  static final Constructor h2SeekableConstructor = 
getH2SeekableConstructor();

Review Comment:
   Rather than using two static methods, you can use `DynConstructors` instead 
to make this one expression and reduce error handling code:
   
   ```java
   private static final DynConstructors.Ctor h2streamCtor =
   DynConstructors.Builder(SeekableInputStream.class)
   .impl("org.apache.parquet.hadoop.util.H2SeekableInputStream", 
FSDataInputStream.class)
   .orNull()
   .build()
   
   ...
   if (h2streamCtor != null) {
 return h2streamCtor.newInstance(stream);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2292) Improve default SpecificRecord model selection for Avro{Write,Read}Support

2023-05-02 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17718571#comment-17718571
 ] 

ASF GitHub Bot commented on PARQUET-2292:
-

clairemcginty commented on code in PR #1078:
URL: https://github.com/apache/parquet-mr/pull/1078#discussion_r1182544574


##
parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java:
##
@@ -169,6 +172,46 @@ public void add(Object value) {
 }
   }
 
+  /**
+   * Returns the specific data model for a given SpecificRecord schema by 
reflecting the underlying
+   * Avro class's `MODEL$` field, or Null if the class is not on the classpath 
or reflection fails.
+   */
+  static SpecificData getModelForSchema(Schema schema) {
+final Class clazz;
+
+if (schema != null && (schema.getType() == Schema.Type.RECORD || 
schema.getType() == Schema.Type.UNION)) {
+  clazz = SpecificData.get().getClass(schema);
+} else {
+  return null;
+}
+
+final SpecificData model;
+try {
+  final Field modelField = clazz.getDeclaredField("MODEL$");
+  modelField.setAccessible(true);
+
+  model = (SpecificData) modelField.get(null);
+} catch (Exception e) {
+  return null;
+}
+
+try {
+  final String avroVersion = 
Schema.Parser.class.getPackage().getImplementationVersion();
+  // Avro 1.8 doesn't include conversions in the MODEL$ field
+  if (avroVersion.startsWith("1.8.")) {

Review Comment:
   yeah - it's unfortunate that this isn't solvable without reflection. I've 
manually tested this with earlier versions of Avro... let me see if I can 
reshape them into automated tests for `parquet-avro`  





> Improve default SpecificRecord model selection for Avro{Write,Read}Support
> --
>
> Key: PARQUET-2292
> URL: https://issues.apache.org/jira/browse/PARQUET-2292
> Project: Parquet
>  Issue Type: Improvement
>Reporter: Claire McGinty
>Assignee: Claire McGinty
>Priority: Major
>
> AvroWriteSupport/AvroReadSupport can improve the precision of their default 
> `model` selection. Currently they default to new 
> SpecificDataSupplier().get()[0]. This means that SpecificRecord classes that 
> contain logical types will fail out-of-the-box unless a specific 
> DATA_SUPPLIER is configured that contains logical type conversions.
> I think we can improve this and make logical types work by default by 
> defaulting to the value of the `MODEL$` field that every SpecificRecordBase 
> implementation contains, which already contains all the logical conversions 
> for that Avro type. It would require reflection, but that's what the Avro 
> library is already doing to fetch models for Specific types[1].
>  
> [0] 
> [https://github.com/apache/parquet-mr/blob/d38044f5395494e1543581a4b763f624305d3022/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java#L403-L407]
> [1] 
> https://github.com/apache/avro/blob/release-1.11.1/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java#L76-L86



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [parquet-mr] clairemcginty commented on a diff in pull request #1078: PARQUET-2292: Default SpecificRecord model reflects from MODEL$ field

2023-05-02 Thread via GitHub


clairemcginty commented on code in PR #1078:
URL: https://github.com/apache/parquet-mr/pull/1078#discussion_r1182544574


##
parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java:
##
@@ -169,6 +172,46 @@ public void add(Object value) {
 }
   }
 
+  /**
+   * Returns the specific data model for a given SpecificRecord schema by 
reflecting the underlying
+   * Avro class's `MODEL$` field, or Null if the class is not on the classpath 
or reflection fails.
+   */
+  static SpecificData getModelForSchema(Schema schema) {
+final Class clazz;
+
+if (schema != null && (schema.getType() == Schema.Type.RECORD || 
schema.getType() == Schema.Type.UNION)) {
+  clazz = SpecificData.get().getClass(schema);
+} else {
+  return null;
+}
+
+final SpecificData model;
+try {
+  final Field modelField = clazz.getDeclaredField("MODEL$");
+  modelField.setAccessible(true);
+
+  model = (SpecificData) modelField.get(null);
+} catch (Exception e) {
+  return null;
+}
+
+try {
+  final String avroVersion = 
Schema.Parser.class.getPackage().getImplementationVersion();
+  // Avro 1.8 doesn't include conversions in the MODEL$ field
+  if (avroVersion.startsWith("1.8.")) {

Review Comment:
   yeah - it's unfortunate that this isn't solvable without reflection. I've 
manually tested this with earlier versions of Avro... let me see if I can 
reshape them into automated tests for `parquet-avro`  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2292) Improve default SpecificRecord model selection for Avro{Write,Read}Support

2023-05-02 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17718570#comment-17718570
 ] 

ASF GitHub Bot commented on PARQUET-2292:
-

clairemcginty commented on code in PR #1078:
URL: https://github.com/apache/parquet-mr/pull/1078#discussion_r1182538420


##
parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java:
##
@@ -169,6 +172,46 @@ public void add(Object value) {
 }
   }
 
+  /**
+   * Returns the specific data model for a given SpecificRecord schema by 
reflecting the underlying
+   * Avro class's `MODEL$` field, or Null if the class is not on the classpath 
or reflection fails.
+   */
+  static SpecificData getModelForSchema(Schema schema) {
+final Class clazz;
+
+if (schema != null && (schema.getType() == Schema.Type.RECORD || 
schema.getType() == Schema.Type.UNION)) {
+  clazz = SpecificData.get().getClass(schema);
+} else {
+  return null;
+}
+
+final SpecificData model;
+try {
+  final Field modelField = clazz.getDeclaredField("MODEL$");
+  modelField.setAccessible(true);
+
+  model = (SpecificData) modelField.get(null);
+} catch (Exception e) {

Review Comment:
   that sounds good! I had planned to add logging originally, but I saw that 
there were 0 log statements in parquet-avro (and the slf4j dependencies for the 
module are 
[scoped](https://github.com/apache/parquet-mr/blob/master/parquet-avro/pom.xml#L76-L80)
 to `test`). So I'd have to import slf4j into the compile scope as well.





> Improve default SpecificRecord model selection for Avro{Write,Read}Support
> --
>
> Key: PARQUET-2292
> URL: https://issues.apache.org/jira/browse/PARQUET-2292
> Project: Parquet
>  Issue Type: Improvement
>Reporter: Claire McGinty
>Assignee: Claire McGinty
>Priority: Major
>
> AvroWriteSupport/AvroReadSupport can improve the precision of their default 
> `model` selection. Currently they default to new 
> SpecificDataSupplier().get()[0]. This means that SpecificRecord classes that 
> contain logical types will fail out-of-the-box unless a specific 
> DATA_SUPPLIER is configured that contains logical type conversions.
> I think we can improve this and make logical types work by default by 
> defaulting to the value of the `MODEL$` field that every SpecificRecordBase 
> implementation contains, which already contains all the logical conversions 
> for that Avro type. It would require reflection, but that's what the Avro 
> library is already doing to fetch models for Specific types[1].
>  
> [0] 
> [https://github.com/apache/parquet-mr/blob/d38044f5395494e1543581a4b763f624305d3022/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java#L403-L407]
> [1] 
> https://github.com/apache/avro/blob/release-1.11.1/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java#L76-L86



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [parquet-mr] clairemcginty commented on a diff in pull request #1078: PARQUET-2292: Default SpecificRecord model reflects from MODEL$ field

2023-05-02 Thread via GitHub


clairemcginty commented on code in PR #1078:
URL: https://github.com/apache/parquet-mr/pull/1078#discussion_r1182538420


##
parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java:
##
@@ -169,6 +172,46 @@ public void add(Object value) {
 }
   }
 
+  /**
+   * Returns the specific data model for a given SpecificRecord schema by 
reflecting the underlying
+   * Avro class's `MODEL$` field, or Null if the class is not on the classpath 
or reflection fails.
+   */
+  static SpecificData getModelForSchema(Schema schema) {
+final Class clazz;
+
+if (schema != null && (schema.getType() == Schema.Type.RECORD || 
schema.getType() == Schema.Type.UNION)) {
+  clazz = SpecificData.get().getClass(schema);
+} else {
+  return null;
+}
+
+final SpecificData model;
+try {
+  final Field modelField = clazz.getDeclaredField("MODEL$");
+  modelField.setAccessible(true);
+
+  model = (SpecificData) modelField.get(null);
+} catch (Exception e) {

Review Comment:
   that sounds good! I had planned to add logging originally, but I saw that 
there were 0 log statements in parquet-avro (and the slf4j dependencies for the 
module are 
[scoped](https://github.com/apache/parquet-mr/blob/master/parquet-avro/pom.xml#L76-L80)
 to `test`). So I'd have to import slf4j into the compile scope as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (PARQUET-2193) Encrypting only one field in nested field prevents reading of other fields in nested field without keys

2023-05-02 Thread Vignesh Nageswaran (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17718567#comment-17718567
 ] 

Vignesh Nageswaran commented on PARQUET-2193:
-

[~gershinsky] Sir, could you please let us know will there be any permanent 
fix, without setting the parameter `parquet.split.files` to false 

> Encrypting only one field in nested field prevents reading of other fields in 
> nested field without keys
> ---
>
> Key: PARQUET-2193
> URL: https://issues.apache.org/jira/browse/PARQUET-2193
> Project: Parquet
>  Issue Type: New Feature
>  Components: parquet-mr
>Affects Versions: 1.12.0
>Reporter: Vignesh Nageswaran
>Priority: Major
>
> Hi Team,
> While exploring parquet encryption, it is found that, if a field in nested 
> column is encrypted , and If I want to read this parquet directory from other 
> applications which does not have encryption keys to decrypt it, I cannot read 
> the remaining fields of the nested column without keys. 
> Example 
> `
> {code:java}
> case class nestedItem(ic: Int = 0, sic : Double, pc: Int = 0)
> case class SquareItem(int_column: Int, square_int_column : Double, 
> partitionCol: Int, nestedCol :nestedItem)
> `{code}
> In the case class `SquareItem` , `nestedCol` field is nested field and I want 
> to encrypt a field `ic` within it. 
>  
> I also want the footer to be non encrypted , so that I can use the encrypted 
> parquet file by legacy applications. 
>  
> Encryption is successful, however, when I query the parquet file using spark 
> 3.3.0 without having any configuration for parquet encryption set up , I 
> cannot non encrypted fields of `nestedCol` `sic`. I was expecting that only 
> `nestedCol` `ic` field will not be querable.
>  
>  
> Reproducer. 
> Spark 3.3.0 Using Spark-shell 
> Downloaded the file 
> [parquet-hadoop-1.12.0-tests.jar|https://repo1.maven.org/maven2/org/apache/parquet/parquet-hadoop/1.12.0/parquet-hadoop-1.12.0-tests.jar]
>  and added it to spark-jars folder
> Code to create encrypted data. #  
>  
> {code:java}
> sc.hadoopConfiguration.set("parquet.crypto.factory.class" 
> ,"org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory")
> sc.hadoopConfiguration.set("parquet.encryption.kms.client.class" 
> ,"org.apache.parquet.crypto.keytools.mocks.InMemoryKMS")
> sc.hadoopConfiguration.set("parquet.encryption.key.list","key1a: 
> BAECAwQFBgcICQoLDA0ODw==, key2a: BAECAAECAAECAAECAAECAA==, keyz: 
> BAECAAECAAECAAECAAECAA==")
> sc.hadoopConfiguration.set("parquet.encryption.key.material.store.internally","false")
> val encryptedParquetPath = "/tmp/par_enc_footer_non_encrypted"
> valpartitionCol = 1
> case class nestedItem(ic: Int = 0, sic : Double, pc: Int = 0)
> case class SquareItem(int_column: Int, square_int_column : Double, 
> partitionCol: Int, nestedCol :nestedItem)
> val dataRange = (1 to 100).toList
> val squares = sc.parallelize(dataRange.map(i => new SquareItem(i, 
> scala.math.pow(i,2), partitionCol,nestedItem(i,i
> squares.toDS().show()
> squares.toDS().write.partitionBy("partitionCol").mode("overwrite").option("parquet.encryption.column.keys",
>  
> "key1a:square_int_column,nestedCol.ic;").option("parquet.encryption.plaintext.footer",true).option("parquet.encryption.footer.key",
>  "keyz").parquet(encryptedParquetPath)
> {code}
> Code to read the data trying to access non encrypted nested field by opening 
> a new spark-shell
>  
> {code:java}
> val encryptedParquetPath = "/tmp/par_enc_footer_non_encrypted"
> spark.sqlContext.read.parquet(encryptedParquetPath).createOrReplaceTempView("test")
> spark.sql("select nestedCol.sic from test").show(){code}
> As you can see that nestedCol.sic is not encrypted , I was expecting the 
> results, but
> I get the below error
>  
> {code:java}
> Caused by: org.apache.parquet.crypto.ParquetCryptoRuntimeException: 
> [square_int_column]. Null File Decryptor
>   at 
> org.apache.parquet.hadoop.metadata.EncryptedColumnChunkMetaData.decryptIfNeeded(ColumnChunkMetaData.java:602)
>   at 
> org.apache.parquet.hadoop.metadata.ColumnChunkMetaData.getEncodings(ColumnChunkMetaData.java:348)
>   at 
> org.apache.parquet.hadoop.ParquetRecordReader.checkDeltaByteArrayProblem(ParquetRecordReader.java:191)
>   at 
> org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:177)
>   at 
> org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$1(ParquetFileFormat.scala:375)
>   at 
> 

[jira] [Commented] (PARQUET-2292) Improve default SpecificRecord model selection for Avro{Write,Read}Support

2023-05-02 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17718485#comment-17718485
 ] 

ASF GitHub Bot commented on PARQUET-2292:
-

gszadovszky commented on code in PR #1078:
URL: https://github.com/apache/parquet-mr/pull/1078#discussion_r1182232703


##
parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java:
##
@@ -169,6 +172,46 @@ public void add(Object value) {
 }
   }
 
+  /**
+   * Returns the specific data model for a given SpecificRecord schema by 
reflecting the underlying
+   * Avro class's `MODEL$` field, or Null if the class is not on the classpath 
or reflection fails.
+   */
+  static SpecificData getModelForSchema(Schema schema) {
+final Class clazz;
+
+if (schema != null && (schema.getType() == Schema.Type.RECORD || 
schema.getType() == Schema.Type.UNION)) {
+  clazz = SpecificData.get().getClass(schema);
+} else {
+  return null;
+}
+
+final SpecificData model;
+try {
+  final Field modelField = clazz.getDeclaredField("MODEL$");
+  modelField.setAccessible(true);
+
+  model = (SpecificData) modelField.get(null);
+} catch (Exception e) {
+  return null;
+}
+
+try {
+  final String avroVersion = 
Schema.Parser.class.getPackage().getImplementationVersion();
+  // Avro 1.8 doesn't include conversions in the MODEL$ field
+  if (avroVersion.startsWith("1.8.")) {
+final Field conversionsField = clazz.getDeclaredField("conversions");
+conversionsField.setAccessible(true);
+
+final Conversion[] conversions = (Conversion[]) 
conversionsField.get(null);
+
Arrays.stream(conversions).filter(Objects::nonNull).forEach(model::addLogicalTypeConversion);
+  }
+} catch (Exception e) {

Review Comment:
   Similar to the previous comment about logging and catching specific 
exceptions.



##
parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java:
##
@@ -237,6 +242,43 @@ public void testAvroReadSchema() throws IOException {
 }
   }
 
+  @Test

Review Comment:
   We need to test both of `getModelForSchema` related to the avro version. If 
the version check gets more complicated, maybe more versions are to cover.



##
parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java:
##
@@ -169,6 +172,46 @@ public void add(Object value) {
 }
   }
 
+  /**
+   * Returns the specific data model for a given SpecificRecord schema by 
reflecting the underlying
+   * Avro class's `MODEL$` field, or Null if the class is not on the classpath 
or reflection fails.
+   */
+  static SpecificData getModelForSchema(Schema schema) {

Review Comment:
   TBH I do not have a strong opinion on any. I am fine with the current one if 
it works.



##
parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java:
##
@@ -169,6 +172,46 @@ public void add(Object value) {
 }
   }
 
+  /**
+   * Returns the specific data model for a given SpecificRecord schema by 
reflecting the underlying
+   * Avro class's `MODEL$` field, or Null if the class is not on the classpath 
or reflection fails.
+   */
+  static SpecificData getModelForSchema(Schema schema) {
+final Class clazz;
+
+if (schema != null && (schema.getType() == Schema.Type.RECORD || 
schema.getType() == Schema.Type.UNION)) {
+  clazz = SpecificData.get().getClass(schema);
+} else {
+  return null;
+}
+
+final SpecificData model;
+try {
+  final Field modelField = clazz.getDeclaredField("MODEL$");
+  modelField.setAccessible(true);
+
+  model = (SpecificData) modelField.get(null);
+} catch (Exception e) {
+  return null;
+}
+
+try {
+  final String avroVersion = 
Schema.Parser.class.getPackage().getImplementationVersion();
+  // Avro 1.8 doesn't include conversions in the MODEL$ field
+  if (avroVersion.startsWith("1.8.")) {

Review Comment:
   Since we are using reflections on private members there are no compatibility 
guarantees. We shall be very careful here. What about avro versions prior to 
1.8? Also, what if it breaks in the future? Will the related unit test fail for 
a future Avro releases (in case of upgrading the Avro version in the pom)?



##
parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java:
##
@@ -169,6 +172,46 @@ public void add(Object value) {
 }
   }
 
+  /**
+   * Returns the specific data model for a given SpecificRecord schema by 
reflecting the underlying
+   * Avro class's `MODEL$` field, or Null if the class is not on the classpath 
or reflection fails.
+   */
+  static SpecificData getModelForSchema(Schema schema) {
+final Class clazz;
+
+if (schema != null && (schema.getType() == Schema.Type.RECORD || 
schema.getType() == Schema.Type.UNION)) {
+  clazz = 

[GitHub] [parquet-mr] gszadovszky commented on a diff in pull request #1078: PARQUET-2292: Default SpecificRecord model reflects from MODEL$ field

2023-05-02 Thread via GitHub


gszadovszky commented on code in PR #1078:
URL: https://github.com/apache/parquet-mr/pull/1078#discussion_r1182232703


##
parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java:
##
@@ -169,6 +172,46 @@ public void add(Object value) {
 }
   }
 
+  /**
+   * Returns the specific data model for a given SpecificRecord schema by 
reflecting the underlying
+   * Avro class's `MODEL$` field, or Null if the class is not on the classpath 
or reflection fails.
+   */
+  static SpecificData getModelForSchema(Schema schema) {
+final Class clazz;
+
+if (schema != null && (schema.getType() == Schema.Type.RECORD || 
schema.getType() == Schema.Type.UNION)) {
+  clazz = SpecificData.get().getClass(schema);
+} else {
+  return null;
+}
+
+final SpecificData model;
+try {
+  final Field modelField = clazz.getDeclaredField("MODEL$");
+  modelField.setAccessible(true);
+
+  model = (SpecificData) modelField.get(null);
+} catch (Exception e) {
+  return null;
+}
+
+try {
+  final String avroVersion = 
Schema.Parser.class.getPackage().getImplementationVersion();
+  // Avro 1.8 doesn't include conversions in the MODEL$ field
+  if (avroVersion.startsWith("1.8.")) {
+final Field conversionsField = clazz.getDeclaredField("conversions");
+conversionsField.setAccessible(true);
+
+final Conversion[] conversions = (Conversion[]) 
conversionsField.get(null);
+
Arrays.stream(conversions).filter(Objects::nonNull).forEach(model::addLogicalTypeConversion);
+  }
+} catch (Exception e) {

Review Comment:
   Similar to the previous comment about logging and catching specific 
exceptions.



##
parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java:
##
@@ -237,6 +242,43 @@ public void testAvroReadSchema() throws IOException {
 }
   }
 
+  @Test

Review Comment:
   We need to test both of `getModelForSchema` related to the avro version. If 
the version check gets more complicated, maybe more versions are to cover.



##
parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java:
##
@@ -169,6 +172,46 @@ public void add(Object value) {
 }
   }
 
+  /**
+   * Returns the specific data model for a given SpecificRecord schema by 
reflecting the underlying
+   * Avro class's `MODEL$` field, or Null if the class is not on the classpath 
or reflection fails.
+   */
+  static SpecificData getModelForSchema(Schema schema) {

Review Comment:
   TBH I do not have a strong opinion on any. I am fine with the current one if 
it works.



##
parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java:
##
@@ -169,6 +172,46 @@ public void add(Object value) {
 }
   }
 
+  /**
+   * Returns the specific data model for a given SpecificRecord schema by 
reflecting the underlying
+   * Avro class's `MODEL$` field, or Null if the class is not on the classpath 
or reflection fails.
+   */
+  static SpecificData getModelForSchema(Schema schema) {
+final Class clazz;
+
+if (schema != null && (schema.getType() == Schema.Type.RECORD || 
schema.getType() == Schema.Type.UNION)) {
+  clazz = SpecificData.get().getClass(schema);
+} else {
+  return null;
+}
+
+final SpecificData model;
+try {
+  final Field modelField = clazz.getDeclaredField("MODEL$");
+  modelField.setAccessible(true);
+
+  model = (SpecificData) modelField.get(null);
+} catch (Exception e) {
+  return null;
+}
+
+try {
+  final String avroVersion = 
Schema.Parser.class.getPackage().getImplementationVersion();
+  // Avro 1.8 doesn't include conversions in the MODEL$ field
+  if (avroVersion.startsWith("1.8.")) {

Review Comment:
   Since we are using reflections on private members there are no compatibility 
guarantees. We shall be very careful here. What about avro versions prior to 
1.8? Also, what if it breaks in the future? Will the related unit test fail for 
a future Avro releases (in case of upgrading the Avro version in the pom)?



##
parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java:
##
@@ -169,6 +172,46 @@ public void add(Object value) {
 }
   }
 
+  /**
+   * Returns the specific data model for a given SpecificRecord schema by 
reflecting the underlying
+   * Avro class's `MODEL$` field, or Null if the class is not on the classpath 
or reflection fails.
+   */
+  static SpecificData getModelForSchema(Schema schema) {
+final Class clazz;
+
+if (schema != null && (schema.getType() == Schema.Type.RECORD || 
schema.getType() == Schema.Type.UNION)) {
+  clazz = SpecificData.get().getClass(schema);
+} else {
+  return null;
+}
+
+final SpecificData model;
+try {
+  final Field modelField = clazz.getDeclaredField("MODEL$");
+  modelField.setAccessible(true);
+
+  model =