xiangfu0 commented on code in PR #18233:
URL: https://github.com/apache/pinot/pull/18233#discussion_r3106114937
##########
pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufCodeGenMessageDecoder.java:
##########
@@ -47,14 +92,50 @@ public void init(Map<String, String> props, Set<String>
fieldsToRead, String top
"Protocol Buffer schema jar file must be provided");
Preconditions.checkState(props.containsKey(PROTO_CLASS_NAME),
"Protocol Buffer Message class name must be provided");
- String protoClassName = props.getOrDefault(PROTO_CLASS_NAME, "");
- String jarPath = props.getOrDefault(PROTOBUF_JAR_FILE_PATH, "");
- ClassLoader protoMessageClsLoader = loadClass(jarPath);
- Descriptors.Descriptor descriptor =
getDescriptorForProtoClass(protoMessageClsLoader, protoClassName);
+ String jarPath = props.get(PROTOBUF_JAR_FILE_PATH);
+ String protoClassName = props.get(PROTO_CLASS_NAME);
+
+ File localFile = resolveJar(topicName, jarPath);
+ URLClassLoader loader = new URLClassLoader(new
URL[]{localFile.toURI().toURL()});
+ Descriptors.Descriptor descriptor = getDescriptorForProtoClass(loader,
protoClassName);
String codeGenCode = new MessageCodeGen().codegen(descriptor,
fieldsToRead);
- Class<?> recordExtractor = compileClass(protoMessageClsLoader,
+ Class<?> recordExtractor = compileClass(loader,
MessageCodeGen.EXTRACTOR_PACKAGE_NAME + "." +
MessageCodeGen.EXTRACTOR_CLASS_NAME, codeGenCode);
_decodeMethod =
recordExtractor.getMethod(MessageCodeGen.EXTRACTOR_METHOD_NAME, byte[].class,
GenericRow.class);
+ loader.close();
+ }
+
+ /**
+ * Returns the local {@link File} for the given {@code jarPath}, fetching it
from remote storage if needed.
+ *
+ * <p>Fast path: if the cache already holds a local copy for this topic with
the same remote path, return it
+ * immediately with no network I/O.
+ *
+ * <p>Slow path: fetch the JAR, update the cache, register the temp dir for
deletion on JVM exit.
+ *
+ * <p>Stale fallback: if the fetch fails and a prior local copy exists (e.g.
from an older {@code jarPath}),
+ * log a warning and return the stale file so segment creation can proceed.
+ */
+ private static File resolveJar(String topicName, String jarPath)
+ throws Exception {
+ CachedJar cached = JAR_CACHE.get(topicName);
+ if (cached != null && cached._jarPath.equals(jarPath) &&
cached._localFile.exists()) {
Review Comment:
hmm, then another solution might be just keep the crc for the jar as a map
then you can still perform a periodically fetch, but only replace when jar crc
mismatch.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]