This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch security/pinot-avro-remove-md5-wording-clean
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 56f01c262b764647236dc43623b07f71ce12c1be
Author: Xiang Fu <[email protected]>
AuthorDate: Mon Mar 2 17:21:58 2026 -0800

    Refactor schema hash naming in KafkaAvroMessageDecoder.
    
    This removes MD5-specific wording from internal identifiers and comments in 
the pinot-avro decoder to align terminology with FIPS-compliant hash usage 
without changing behavior.
    
    Made-with: Cursor
---
 .../inputformat/avro/KafkaAvroMessageDecoder.java  | 70 +++++++++++-----------
 1 file changed, 35 insertions(+), 35 deletions(-)

diff --git 
a/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/KafkaAvroMessageDecoder.java
 
b/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/KafkaAvroMessageDecoder.java
index c21dbe2356a..3d1f39afc40 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/KafkaAvroMessageDecoder.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/KafkaAvroMessageDecoder.java
@@ -60,15 +60,15 @@ public class KafkaAvroMessageDecoder implements 
StreamMessageDecoder<byte[]> {
   private static final String SCHEMA_REGISTRY_REST_URL = 
"schema.registry.rest.url";
   private static final String SCHEMA_REGISTRY_SCHEMA_NAME = 
"schema.registry.schema.name";
   private org.apache.avro.Schema _defaultAvroSchema;
-  private MD5AvroSchemaMap _md5ToAvroSchemaMap;
+  private HashToSchemaMap _hashToSchemaMap;
 
   // A global cache for schemas across all threads.
   private static final Map<String, org.apache.avro.Schema> GLOBAL_SCHEMA_CACHE 
= new HashMap<>();
   // Suffix for getting the latest schema
   private static final String LATEST = "-latest";
 
-  // Reusable byte[] to read MD5 from payload. This is OK as this class is 
used only by a single thread.
-  private final byte[] _reusableMD5Bytes = new byte[SCHEMA_HASH_LENGTH];
+  // Reusable byte[] to read hash from payload. This class is used only by a 
single thread.
+  private final byte[] _reusableHashBytes = new byte[SCHEMA_HASH_LENGTH];
 
   private DecoderFactory _decoderFactory;
   private RecordExtractor<GenericData.Record> _avroRecordExtractor;
@@ -98,9 +98,9 @@ public class KafkaAvroMessageDecoder implements 
StreamMessageDecoder<byte[]> {
     // With the logic below, we may not set defaultAvroSchema to be the latest 
one everytime.
     // The schema is fetched once when the machine starts. Until the next 
restart. the latest schema is
     // not fetched.
-    // But then we always pay attention to the exact MD5 hash and attempt to 
fetch the schema for that particular hash
-    // before decoding an incoming kafka event. We use defaultAvroSchema only 
if the fetch for the particular MD5 fails,
-    // but then we will retry that fetch on every event in case of failure.
+    // But then we always pay attention to the exact hash and attempt to fetch 
the schema for that particular hash
+    // before decoding an incoming kafka event. We use defaultAvroSchema only 
if the fetch for the particular hash
+    // fails, but then we will retry that fetch on every event in case of 
failure.
     synchronized (GLOBAL_SCHEMA_CACHE) {
       final String hashKey = avroSchemaName + LATEST;
       _defaultAvroSchema = GLOBAL_SCHEMA_CACHE.get(hashKey);
@@ -125,7 +125,7 @@ public class KafkaAvroMessageDecoder implements 
StreamMessageDecoder<byte[]> {
     _avroRecordExtractor = 
PluginManager.get().createInstance(recordExtractorClass);
     _avroRecordExtractor.init(fieldsToRead, config);
     _decoderFactory = new DecoderFactory();
-    _md5ToAvroSchemaMap = new MD5AvroSchemaMap();
+    _hashToSchemaMap = new HashToSchemaMap();
   }
 
   @Nullable
@@ -142,25 +142,25 @@ public class KafkaAvroMessageDecoder implements 
StreamMessageDecoder<byte[]> {
       return null;
     }
 
-    System.arraycopy(payload, SCHEMA_HASH_START_OFFSET + offset, 
_reusableMD5Bytes, 0, SCHEMA_HASH_LENGTH);
+    System.arraycopy(payload, SCHEMA_HASH_START_OFFSET + offset, 
_reusableHashBytes, 0, SCHEMA_HASH_LENGTH);
 
     boolean schemaUpdateFailed = false;
-    org.apache.avro.Schema schema = 
_md5ToAvroSchemaMap.getSchema(_reusableMD5Bytes);
+    org.apache.avro.Schema schema = 
_hashToSchemaMap.getSchema(_reusableHashBytes);
     if (schema == null) {
       // We will get here for the first row consumed in the segment, and every 
row that has a schema ID that is
-      // not yet in md5ToAvroSchemaMap.
+      // not yet in hashToAvroSchemaMap.
       synchronized (GLOBAL_SCHEMA_CACHE) {
-        final String hashKey = hex(_reusableMD5Bytes);
+        final String hashKey = hex(_reusableHashBytes);
         schema = GLOBAL_SCHEMA_CACHE.get(hashKey);
         if (schema == null) {
           // We will get here only if no partition of the table has populated 
the global schema cache.
           // In that case, one of the consumers will fetch the schema and 
populate the cache, and the others
           // should find it in the cache and po
-          final String schemaUri = "/id=" + hex(_reusableMD5Bytes);
+          final String schemaUri = "/id=" + hex(_reusableHashBytes);
           try {
             schema = fetchSchema(schemaUri);
             GLOBAL_SCHEMA_CACHE.put(hashKey, schema);
-            _md5ToAvroSchemaMap.addSchema(_reusableMD5Bytes, schema);
+            _hashToSchemaMap.addSchema(_reusableHashBytes, schema);
           } catch (Exception e) {
             schema = _defaultAvroSchema;
             LOGGER.error("Error fetching schema using url {}. Attempting to 
continue with previous schema", schemaUri,
@@ -169,7 +169,7 @@ public class KafkaAvroMessageDecoder implements 
StreamMessageDecoder<byte[]> {
           }
         } else {
           LOGGER.info("Found schema for {} in cache", hashKey);
-          _md5ToAvroSchemaMap.addSchema(_reusableMD5Bytes, schema);
+          _hashToSchemaMap.addSchema(_reusableHashBytes, schema);
         }
       }
     }
@@ -199,8 +199,8 @@ public class KafkaAvroMessageDecoder implements 
StreamMessageDecoder<byte[]> {
 
   private static class SchemaFetcher implements Callable<Boolean> {
     private org.apache.avro.Schema _schema;
-    private URL _url;
-    private boolean _isSuccessful = false;
+    private final URL _url;
+    private final boolean _isSuccessful = false;
 
     SchemaFetcher(URL url) {
       _url = url;
@@ -248,33 +248,33 @@ public class KafkaAvroMessageDecoder implements 
StreamMessageDecoder<byte[]> {
   }
 
   /**
-   * Private class for encapsulating MD5 to Avro schema mapping.
+   * Private class for encapsulating hash to Avro schema mapping.
    * <ul>
-   *   <li> Maintains two lists, one for md5s and another for schema. </li>
-   *   <li> MD5 at index i in the MD5 list, corresponds to Schema at index i 
in the schema list. </li>
+   *   <li> Maintains two lists, one for hashes and another for schema. </li>
+   *   <li> Hash at index i in the hash list corresponds to schema at index i 
in the schema list. </li>
    * </ul>
    */
-  private static class MD5AvroSchemaMap {
-    private List<byte[]> _md5s;
-    private List<org.apache.avro.Schema> _schemas;
+  private static class HashToSchemaMap {
+    private final List<byte[]> _hashes;
+    private final List<org.apache.avro.Schema> _schemas;
 
     /**
      * Constructor for the class.
      */
-    private MD5AvroSchemaMap() {
-      _md5s = new ArrayList<>();
+    private HashToSchemaMap() {
+      _hashes = new ArrayList<>();
       _schemas = new ArrayList<>();
     }
 
     /**
-     * Returns the Avro schema corresponding to the given MD5.
+     * Returns the Avro schema corresponding to the given hash.
      *
-     * @param md5ForSchema MD5 for which to get the avro schema.
-     * @return Avro schema for the given MD5.
+     * @param hash Hash for which to get the avro schema.
+     * @return Avro schema for the given hash.
      */
-    private org.apache.avro.Schema getSchema(byte[] md5ForSchema) {
-      for (int i = 0; i < _md5s.size(); i++) {
-        if (Arrays.equals(_md5s.get(i), md5ForSchema)) {
+    private org.apache.avro.Schema getSchema(byte[] hash) {
+      for (int i = 0; i < _hashes.size(); i++) {
+        if (Arrays.equals(_hashes.get(i), hash)) {
           return _schemas.get(i);
         }
       }
@@ -282,14 +282,14 @@ public class KafkaAvroMessageDecoder implements 
StreamMessageDecoder<byte[]> {
     }
 
     /**
-     * Adds mapping between MD5 and Avro schema.
-     * Caller to ensure that addSchema is called only once per MD5-Schema pair.
+     * Adds mapping between hash and avro schema.
+     * Caller to ensure that addSchema is called only once per hash-schema 
pair.
      *
-     * @param md5 MD5 for the Schema
+     * @param hash Hash for the schema
      * @param schema Avro Schema
      */
-    private void addSchema(byte[] md5, org.apache.avro.Schema schema) {
-      _md5s.add(Arrays.copyOf(md5, md5.length));
+    private void addSchema(byte[] hash, org.apache.avro.Schema schema) {
+      _hashes.add(Arrays.copyOf(hash, hash.length));
       _schemas.add(schema);
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to