rseetham commented on code in PR #18233:
URL: https://github.com/apache/pinot/pull/18233#discussion_r3092439300
##########
pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufCodeGenMessageDecoder.java:
##########
@@ -47,14 +92,50 @@ public void init(Map<String, String> props, Set<String>
fieldsToRead, String top
"Protocol Buffer schema jar file must be provided");
Preconditions.checkState(props.containsKey(PROTO_CLASS_NAME),
"Protocol Buffer Message class name must be provided");
- String protoClassName = props.getOrDefault(PROTO_CLASS_NAME, "");
- String jarPath = props.getOrDefault(PROTOBUF_JAR_FILE_PATH, "");
- ClassLoader protoMessageClsLoader = loadClass(jarPath);
- Descriptors.Descriptor descriptor =
getDescriptorForProtoClass(protoMessageClsLoader, protoClassName);
+ String jarPath = props.get(PROTOBUF_JAR_FILE_PATH);
+ String protoClassName = props.get(PROTO_CLASS_NAME);
+
+ File localFile = resolveJar(topicName, jarPath);
+ URLClassLoader loader = new URLClassLoader(new
URL[]{localFile.toURI().toURL()});
+ Descriptors.Descriptor descriptor = getDescriptorForProtoClass(loader,
protoClassName);
String codeGenCode = new MessageCodeGen().codegen(descriptor,
fieldsToRead);
- Class<?> recordExtractor = compileClass(protoMessageClsLoader,
+ Class<?> recordExtractor = compileClass(loader,
MessageCodeGen.EXTRACTOR_PACKAGE_NAME + "." +
MessageCodeGen.EXTRACTOR_CLASS_NAME, codeGenCode);
_decodeMethod =
recordExtractor.getMethod(MessageCodeGen.EXTRACTOR_METHOD_NAME, byte[].class,
GenericRow.class);
+ loader.close();
+ }
+
+ /**
+ * Returns the local {@link File} for the given {@code jarPath}, fetching it
from remote storage if needed.
+ *
+ * <p>Fast path: if the cache already holds a local copy for this topic with
the same remote path, return it
+ * immediately with no network I/O.
+ *
+ * <p>Slow path: fetch the JAR, update the cache, register the temp dir for
deletion on JVM exit.
+ *
+ * <p>Stale fallback: if the fetch fails and a prior local copy exists (e.g.
from an older {@code jarPath}),
+ * log a warning and return the stale file so segment creation can proceed.
+ */
+ private static File resolveJar(String topicName, String jarPath)
+ throws Exception {
+ CachedJar cached = JAR_CACHE.get(topicName);
+ if (cached != null && cached._jarPath.equals(jarPath) &&
cached._localFile.exists()) {
+ return cached._localFile;
Review Comment:
For a given kafka topic, the data can only be in 1 format. So the jar is the
same to read from it. It the schema of different tables is different only those
fields will be extracted per table. Here we are only caching the jar so that we
don't keep fetching all the time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]