This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch security/pinot-avro-remove-md5-wording-clean in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 56f01c262b764647236dc43623b07f71ce12c1be Author: Xiang Fu <[email protected]> AuthorDate: Mon Mar 2 17:21:58 2026 -0800 Refactor schema hash naming in KafkaAvroMessageDecoder. This removes MD5-specific wording from internal identifiers and comments in the pinot-avro decoder to align terminology with FIPS-compliant hash usage without changing behavior. Made-with: Cursor --- .../inputformat/avro/KafkaAvroMessageDecoder.java | 70 +++++++++++----------- 1 file changed, 35 insertions(+), 35 deletions(-) diff --git a/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/KafkaAvroMessageDecoder.java b/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/KafkaAvroMessageDecoder.java index c21dbe2356a..3d1f39afc40 100644 --- a/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/KafkaAvroMessageDecoder.java +++ b/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/KafkaAvroMessageDecoder.java @@ -60,15 +60,15 @@ public class KafkaAvroMessageDecoder implements StreamMessageDecoder<byte[]> { private static final String SCHEMA_REGISTRY_REST_URL = "schema.registry.rest.url"; private static final String SCHEMA_REGISTRY_SCHEMA_NAME = "schema.registry.schema.name"; private org.apache.avro.Schema _defaultAvroSchema; - private MD5AvroSchemaMap _md5ToAvroSchemaMap; + private HashToSchemaMap _hashToSchemaMap; // A global cache for schemas across all threads. private static final Map<String, org.apache.avro.Schema> GLOBAL_SCHEMA_CACHE = new HashMap<>(); // Suffix for getting the latest schema private static final String LATEST = "-latest"; - // Reusable byte[] to read MD5 from payload. This is OK as this class is used only by a single thread. - private final byte[] _reusableMD5Bytes = new byte[SCHEMA_HASH_LENGTH]; + // Reusable byte[] to read hash from payload. This class is used only by a single thread. + private final byte[] _reusableHashBytes = new byte[SCHEMA_HASH_LENGTH]; private DecoderFactory _decoderFactory; private RecordExtractor<GenericData.Record> _avroRecordExtractor; @@ -98,9 +98,9 @@ public class KafkaAvroMessageDecoder implements StreamMessageDecoder<byte[]> { // With the logic below, we may not set defaultAvroSchema to be the latest one everytime. // The schema is fetched once when the machine starts. Until the next restart. the latest schema is // not fetched. - // But then we always pay attention to the exact MD5 hash and attempt to fetch the schema for that particular hash - // before decoding an incoming kafka event. We use defaultAvroSchema only if the fetch for the particular MD5 fails, - // but then we will retry that fetch on every event in case of failure. + // But then we always pay attention to the exact hash and attempt to fetch the schema for that particular hash + // before decoding an incoming kafka event. We use defaultAvroSchema only if the fetch for the particular hash + // fails, but then we will retry that fetch on every event in case of failure. synchronized (GLOBAL_SCHEMA_CACHE) { final String hashKey = avroSchemaName + LATEST; _defaultAvroSchema = GLOBAL_SCHEMA_CACHE.get(hashKey); @@ -125,7 +125,7 @@ public class KafkaAvroMessageDecoder implements StreamMessageDecoder<byte[]> { _avroRecordExtractor = PluginManager.get().createInstance(recordExtractorClass); _avroRecordExtractor.init(fieldsToRead, config); _decoderFactory = new DecoderFactory(); - _md5ToAvroSchemaMap = new MD5AvroSchemaMap(); + _hashToSchemaMap = new HashToSchemaMap(); } @Nullable @@ -142,25 +142,25 @@ public class KafkaAvroMessageDecoder implements StreamMessageDecoder<byte[]> { return null; } - System.arraycopy(payload, SCHEMA_HASH_START_OFFSET + offset, _reusableMD5Bytes, 0, SCHEMA_HASH_LENGTH); + System.arraycopy(payload, SCHEMA_HASH_START_OFFSET + offset, _reusableHashBytes, 0, SCHEMA_HASH_LENGTH); boolean schemaUpdateFailed = false; - org.apache.avro.Schema schema = _md5ToAvroSchemaMap.getSchema(_reusableMD5Bytes); + org.apache.avro.Schema schema = _hashToSchemaMap.getSchema(_reusableHashBytes); if (schema == null) { // We will get here for the first row consumed in the segment, and every row that has a schema ID that is - // not yet in md5ToAvroSchemaMap. + // not yet in hashToAvroSchemaMap. synchronized (GLOBAL_SCHEMA_CACHE) { - final String hashKey = hex(_reusableMD5Bytes); + final String hashKey = hex(_reusableHashBytes); schema = GLOBAL_SCHEMA_CACHE.get(hashKey); if (schema == null) { // We will get here only if no partition of the table has populated the global schema cache. // In that case, one of the consumers will fetch the schema and populate the cache, and the others // should find it in the cache and po - final String schemaUri = "/id=" + hex(_reusableMD5Bytes); + final String schemaUri = "/id=" + hex(_reusableHashBytes); try { schema = fetchSchema(schemaUri); GLOBAL_SCHEMA_CACHE.put(hashKey, schema); - _md5ToAvroSchemaMap.addSchema(_reusableMD5Bytes, schema); + _hashToSchemaMap.addSchema(_reusableHashBytes, schema); } catch (Exception e) { schema = _defaultAvroSchema; LOGGER.error("Error fetching schema using url {}. Attempting to continue with previous schema", schemaUri, @@ -169,7 +169,7 @@ public class KafkaAvroMessageDecoder implements StreamMessageDecoder<byte[]> { } } else { LOGGER.info("Found schema for {} in cache", hashKey); - _md5ToAvroSchemaMap.addSchema(_reusableMD5Bytes, schema); + _hashToSchemaMap.addSchema(_reusableHashBytes, schema); } } } @@ -199,8 +199,8 @@ public class KafkaAvroMessageDecoder implements StreamMessageDecoder<byte[]> { private static class SchemaFetcher implements Callable<Boolean> { private org.apache.avro.Schema _schema; - private URL _url; - private boolean _isSuccessful = false; + private final URL _url; + private final boolean _isSuccessful = false; SchemaFetcher(URL url) { _url = url; @@ -248,33 +248,33 @@ public class KafkaAvroMessageDecoder implements StreamMessageDecoder<byte[]> { } /** - * Private class for encapsulating MD5 to Avro schema mapping. + * Private class for encapsulating hash to Avro schema mapping. * <ul> - * <li> Maintains two lists, one for md5s and another for schema. </li> - * <li> MD5 at index i in the MD5 list, corresponds to Schema at index i in the schema list. </li> + * <li> Maintains two lists, one for hashes and another for schema. </li> + * <li> Hash at index i in the hash list corresponds to schema at index i in the schema list. </li> * </ul> */ - private static class MD5AvroSchemaMap { - private List<byte[]> _md5s; - private List<org.apache.avro.Schema> _schemas; + private static class HashToSchemaMap { + private final List<byte[]> _hashes; + private final List<org.apache.avro.Schema> _schemas; /** * Constructor for the class. */ - private MD5AvroSchemaMap() { - _md5s = new ArrayList<>(); + private HashToSchemaMap() { + _hashes = new ArrayList<>(); _schemas = new ArrayList<>(); } /** - * Returns the Avro schema corresponding to the given MD5. + * Returns the Avro schema corresponding to the given hash. * - * @param md5ForSchema MD5 for which to get the avro schema. - * @return Avro schema for the given MD5. + * @param hash Hash for which to get the avro schema. + * @return Avro schema for the given hash. */ - private org.apache.avro.Schema getSchema(byte[] md5ForSchema) { - for (int i = 0; i < _md5s.size(); i++) { - if (Arrays.equals(_md5s.get(i), md5ForSchema)) { + private org.apache.avro.Schema getSchema(byte[] hash) { + for (int i = 0; i < _hashes.size(); i++) { + if (Arrays.equals(_hashes.get(i), hash)) { return _schemas.get(i); } } @@ -282,14 +282,14 @@ public class KafkaAvroMessageDecoder implements StreamMessageDecoder<byte[]> { } /** - * Adds mapping between MD5 and Avro schema. - * Caller to ensure that addSchema is called only once per MD5-Schema pair. + * Adds mapping between hash and avro schema. + * Caller to ensure that addSchema is called only once per hash-schema pair. * - * @param md5 MD5 for the Schema + * @param hash Hash for the schema * @param schema Avro Schema */ - private void addSchema(byte[] md5, org.apache.avro.Schema schema) { - _md5s.add(Arrays.copyOf(md5, md5.length)); + private void addSchema(byte[] hash, org.apache.avro.Schema schema) { + _hashes.add(Arrays.copyOf(hash, hash.length)); _schemas.add(schema); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
