This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9648c99cf5a Refactor schema hash naming in KafkaAvroMessageDecoder.
(#17798)
9648c99cf5a is described below
commit 9648c99cf5a229d96b42ca456c167d9935e85270
Author: Xiang Fu <[email protected]>
AuthorDate: Mon Mar 2 23:36:41 2026 -0800
Refactor schema hash naming in KafkaAvroMessageDecoder. (#17798)
This removes MD5-specific wording from internal identifiers and comments in
the pinot-avro decoder to align terminology with FIPS-compliant hash usage
without changing behavior.
Made-with: Cursor
---
.../inputformat/avro/KafkaAvroMessageDecoder.java | 92 +++++++++++-----------
1 file changed, 45 insertions(+), 47 deletions(-)
diff --git
a/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/KafkaAvroMessageDecoder.java
b/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/KafkaAvroMessageDecoder.java
index c21dbe2356a..56ba6958486 100644
---
a/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/KafkaAvroMessageDecoder.java
+++
b/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/KafkaAvroMessageDecoder.java
@@ -34,6 +34,7 @@ import java.util.Set;
import java.util.concurrent.Callable;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericDatumReader;
@@ -59,16 +60,16 @@ public class KafkaAvroMessageDecoder implements
StreamMessageDecoder<byte[]> {
private static final String SCHEMA_REGISTRY_REST_URL =
"schema.registry.rest.url";
private static final String SCHEMA_REGISTRY_SCHEMA_NAME =
"schema.registry.schema.name";
- private org.apache.avro.Schema _defaultAvroSchema;
- private MD5AvroSchemaMap _md5ToAvroSchemaMap;
+ private Schema _defaultAvroSchema;
+ private HashToSchemaMap _hashToSchemaMap;
// A global cache for schemas across all threads.
- private static final Map<String, org.apache.avro.Schema> GLOBAL_SCHEMA_CACHE
= new HashMap<>();
+ private static final Map<String, Schema> GLOBAL_SCHEMA_CACHE = new
HashMap<>();
// Suffix for getting the latest schema
private static final String LATEST = "-latest";
- // Reusable byte[] to read MD5 from payload. This is OK as this class is
used only by a single thread.
- private final byte[] _reusableMD5Bytes = new byte[SCHEMA_HASH_LENGTH];
+ // Reusable byte[] to read hash from payload. This class is used only by a
single thread.
+ private final byte[] _reusableHashBytes = new byte[SCHEMA_HASH_LENGTH];
private DecoderFactory _decoderFactory;
private RecordExtractor<GenericData.Record> _avroRecordExtractor;
@@ -98,9 +99,9 @@ public class KafkaAvroMessageDecoder implements
StreamMessageDecoder<byte[]> {
// With the logic below, we may not set defaultAvroSchema to be the latest
one everytime.
// The schema is fetched once when the machine starts. Until the next
restart. the latest schema is
// not fetched.
- // But then we always pay attention to the exact MD5 hash and attempt to
fetch the schema for that particular hash
- // before decoding an incoming kafka event. We use defaultAvroSchema only
if the fetch for the particular MD5 fails,
- // but then we will retry that fetch on every event in case of failure.
+ // But then we always pay attention to the exact hash and attempt to fetch
the schema for that particular hash
+ // before decoding an incoming kafka event. We use defaultAvroSchema only
if the fetch for the particular hash
+ // fails, but then we will retry that fetch on every event in case of
failure.
synchronized (GLOBAL_SCHEMA_CACHE) {
final String hashKey = avroSchemaName + LATEST;
_defaultAvroSchema = GLOBAL_SCHEMA_CACHE.get(hashKey);
@@ -125,7 +126,7 @@ public class KafkaAvroMessageDecoder implements
StreamMessageDecoder<byte[]> {
_avroRecordExtractor =
PluginManager.get().createInstance(recordExtractorClass);
_avroRecordExtractor.init(fieldsToRead, config);
_decoderFactory = new DecoderFactory();
- _md5ToAvroSchemaMap = new MD5AvroSchemaMap();
+ _hashToSchemaMap = new HashToSchemaMap();
}
@Nullable
@@ -142,25 +143,25 @@ public class KafkaAvroMessageDecoder implements
StreamMessageDecoder<byte[]> {
return null;
}
- System.arraycopy(payload, SCHEMA_HASH_START_OFFSET + offset,
_reusableMD5Bytes, 0, SCHEMA_HASH_LENGTH);
+ System.arraycopy(payload, SCHEMA_HASH_START_OFFSET + offset,
_reusableHashBytes, 0, SCHEMA_HASH_LENGTH);
boolean schemaUpdateFailed = false;
- org.apache.avro.Schema schema =
_md5ToAvroSchemaMap.getSchema(_reusableMD5Bytes);
+ Schema schema = _hashToSchemaMap.getSchema(_reusableHashBytes);
if (schema == null) {
// We will get here for the first row consumed in the segment, and every
row that has a schema ID that is
- // not yet in md5ToAvroSchemaMap.
+ // not yet in hashToAvroSchemaMap.
synchronized (GLOBAL_SCHEMA_CACHE) {
- final String hashKey = hex(_reusableMD5Bytes);
+ final String hashKey = hex(_reusableHashBytes);
schema = GLOBAL_SCHEMA_CACHE.get(hashKey);
if (schema == null) {
// We will get here only if no partition of the table has populated
the global schema cache.
// In that case, one of the consumers will fetch the schema and
populate the cache, and the others
- // should find it in the cache and po
- final String schemaUri = "/id=" + hex(_reusableMD5Bytes);
+ // should find it in the cache and populate their local schema maps.
+ final String schemaUri = "/id=" + hex(_reusableHashBytes);
try {
schema = fetchSchema(schemaUri);
GLOBAL_SCHEMA_CACHE.put(hashKey, schema);
- _md5ToAvroSchemaMap.addSchema(_reusableMD5Bytes, schema);
+ _hashToSchemaMap.addSchema(_reusableHashBytes, schema);
} catch (Exception e) {
schema = _defaultAvroSchema;
LOGGER.error("Error fetching schema using url {}. Attempting to
continue with previous schema", schemaUri,
@@ -169,7 +170,7 @@ public class KafkaAvroMessageDecoder implements
StreamMessageDecoder<byte[]> {
}
} else {
LOGGER.info("Found schema for {} in cache", hashKey);
- _md5ToAvroSchemaMap.addSchema(_reusableMD5Bytes, schema);
+ _hashToSchemaMap.addSchema(_reusableHashBytes, schema);
}
}
}
@@ -198,22 +199,20 @@ public class KafkaAvroMessageDecoder implements
StreamMessageDecoder<byte[]> {
}
private static class SchemaFetcher implements Callable<Boolean> {
- private org.apache.avro.Schema _schema;
- private URL _url;
- private boolean _isSuccessful = false;
+ private Schema _schema;
+ private final URL _url;
SchemaFetcher(URL url) {
_url = url;
}
@Override
- public Boolean call()
- throws Exception {
+ public Boolean call() {
try {
URLConnection conn = _url.openConnection();
conn.setConnectTimeout(15000);
conn.setReadTimeout(15000);
- LOGGER.info("Fetching schema using url {}", _url.toString());
+ LOGGER.info("Fetching schema using url {}", _url);
StringBuilder queryResp = new StringBuilder();
try (BufferedReader reader = new BufferedReader(
@@ -222,10 +221,9 @@ public class KafkaAvroMessageDecoder implements
StreamMessageDecoder<byte[]> {
queryResp.append(line);
}
}
+ _schema = Schema.parse(queryResp.toString());
- _schema = org.apache.avro.Schema.parse(queryResp.toString());
-
- LOGGER.info("Schema fetch succeeded on url {}", _url.toString());
+ LOGGER.info("Schema fetch succeeded on url {}", _url);
return Boolean.TRUE;
} catch (Exception e) {
LOGGER.warn("Caught exception while fetching schema", e);
@@ -233,12 +231,12 @@ public class KafkaAvroMessageDecoder implements
StreamMessageDecoder<byte[]> {
}
}
- public org.apache.avro.Schema getSchema() {
+ public Schema getSchema() {
return _schema;
}
}
- private org.apache.avro.Schema fetchSchema(String reference)
+ private Schema fetchSchema(String reference)
throws Exception {
SchemaFetcher schemaFetcher = new SchemaFetcher(makeRandomUrl(reference));
RetryPolicies
@@ -248,33 +246,33 @@ public class KafkaAvroMessageDecoder implements
StreamMessageDecoder<byte[]> {
}
/**
- * Private class for encapsulating MD5 to Avro schema mapping.
+ * Private class for encapsulating hash to Avro schema mapping.
* <ul>
- * <li> Maintains two lists, one for md5s and another for schema. </li>
- * <li> MD5 at index i in the MD5 list, corresponds to Schema at index i
in the schema list. </li>
+ * <li> Maintains two lists, one for hashes and another for schema. </li>
+ * <li> Hash at index i in the hash list corresponds to schema at index i
in the schema list. </li>
* </ul>
*/
- private static class MD5AvroSchemaMap {
- private List<byte[]> _md5s;
- private List<org.apache.avro.Schema> _schemas;
+ private static class HashToSchemaMap {
+ private final List<byte[]> _hashes;
+ private final List<Schema> _schemas;
/**
* Constructor for the class.
*/
- private MD5AvroSchemaMap() {
- _md5s = new ArrayList<>();
+ private HashToSchemaMap() {
+ _hashes = new ArrayList<>();
_schemas = new ArrayList<>();
}
/**
- * Returns the Avro schema corresponding to the given MD5.
+ * Returns the Avro schema corresponding to the given hash.
*
- * @param md5ForSchema MD5 for which to get the avro schema.
- * @return Avro schema for the given MD5.
+ * @param hash Hash for which to get the avro schema.
+ * @return Avro schema for the given hash.
*/
- private org.apache.avro.Schema getSchema(byte[] md5ForSchema) {
- for (int i = 0; i < _md5s.size(); i++) {
- if (Arrays.equals(_md5s.get(i), md5ForSchema)) {
+ private Schema getSchema(byte[] hash) {
+ for (int i = 0; i < _hashes.size(); i++) {
+ if (Arrays.equals(_hashes.get(i), hash)) {
return _schemas.get(i);
}
}
@@ -282,14 +280,14 @@ public class KafkaAvroMessageDecoder implements
StreamMessageDecoder<byte[]> {
}
/**
- * Adds mapping between MD5 and Avro schema.
- * Caller to ensure that addSchema is called only once per MD5-Schema pair.
+ * Adds mapping between hash and avro schema.
+ * Caller to ensure that addSchema is called only once per hash-schema
pair.
*
- * @param md5 MD5 for the Schema
+ * @param hash Hash for the schema
* @param schema Avro Schema
*/
- private void addSchema(byte[] md5, org.apache.avro.Schema schema) {
- _md5s.add(Arrays.copyOf(md5, md5.length));
+ private void addSchema(byte[] hash, Schema schema) {
+ _hashes.add(Arrays.copyOf(hash, hash.length));
_schemas.add(schema);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]