[GitHub] [pulsar] sijie commented on a change in pull request #5357: [functions][Issue:5350]Fix pulsar can't load the customized SerDe
sijie commented on a change in pull request #5357: [functions][Issue:5350]Fix pulsar can't load the customized SerDe URL: https://github.com/apache/pulsar/pull/5357#discussion_r339349691 ## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java ## @@ -163,6 +168,43 @@ private static boolean isProtobufClass(Class pojoClazz) { } } +@SuppressWarnings("unchecked") +private Schema newSchemaInstance(String topic, Class clazz, String schemaTypeOrClassName, boolean input, ClassLoader classLoader) { +// The schemaTypeOrClassName can represent multiple thing, either a schema type, a schema class name or a ser-de +// class name. + +if (StringUtils.isEmpty(schemaTypeOrClassName) || DEFAULT_SERDE.equals(schemaTypeOrClassName)) { +// No preferred schema was provided, auto-discover schema or fallback to defaults +return newSchemaInstance(clazz, getSchemaTypeOrDefault(topic, clazz)); +} + +SchemaType schemaType = null; +try { +schemaType = SchemaType.valueOf(schemaTypeOrClassName.toUpperCase()); +} catch (IllegalArgumentException e) { +// schemaType is not referring to builtin type +} + +if (schemaType != null) { +// The parameter passed was indeed a valid builtin schema type +return newSchemaInstance(clazz, schemaType); +} + +// At this point, the string can represent either a schema or serde class name. Create an instance and +// check if it complies with either interface + +// First try with Schema +try { +return (Schema) InstanceUtils.initializeCustomSchema(schemaTypeOrClassName, +classLoader, clazz, input); +} catch (Throwable t) { +// Now try with Serde or just fail +SerDe serDe = (SerDe) InstanceUtils.initializeSerDe(schemaTypeOrClassName, +classLoader, clazz, input); +return new SerDeSchema<>(serDe); +} +} + @SuppressWarnings("unchecked") private Schema newSchemaInstance(String topic, Class clazz, String schemaTypeOrClassName, boolean input) { Review comment: Can you rewrite this method by calling the method in line 172? ``` private Schema newSchemaInstance(String topic, Class clazz, String schemaTypeOrClassName, boolean input) { return newSchemaInstance(topic, clazz, schemaTypeOrClassName, input, Thread.currentThread().getContextClassLoader()); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] sijie commented on a change in pull request #5357: [functions][Issue:5350]Fix pulsar can't load the customized SerDe
sijie commented on a change in pull request #5357: [functions][Issue:5350]Fix pulsar can't load the customized SerDe URL: https://github.com/apache/pulsar/pull/5357#discussion_r339349701 ## File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java ## @@ -59,11 +60,7 @@ import org.testng.annotations.Test; import org.testng.collections.Maps; -import java.util.Collections; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Set; +import java.util.*; Review comment: don't import `*` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] sijie commented on a change in pull request #5357: [functions][Issue:5350]Fix pulsar can't load the customized SerDe
sijie commented on a change in pull request #5357: [functions][Issue:5350]Fix pulsar can't load the customized SerDe URL: https://github.com/apache/pulsar/pull/5357#discussion_r338459546 ## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java ## @@ -163,6 +168,43 @@ private static boolean isProtobufClass(Class pojoClazz) { } } +@SuppressWarnings("unchecked") +private Schema newSchemaInstance(String topic, Class clazz, String schemaTypeOrClassName, boolean input, ClassLoader classLoader) { Review comment: I don't think we should duplicate the code when adding `classLoader`. It makes code maintenance become very hard. Can we try to avoid duplicating the code here? Instead, can you just improve the method below to handle loading using an additional class loader? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] sijie commented on a change in pull request #5357: [functions][Issue:5350]Fix pulsar can't load the customized SerDe
sijie commented on a change in pull request #5357: [functions][Issue:5350]Fix pulsar can't load the customized SerDe URL: https://github.com/apache/pulsar/pull/5357#discussion_r334229383 ## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java ## @@ -207,6 +207,8 @@ JavaInstance setupJavaInstance() throws Exception { ContextImpl contextImpl = setupContext(); +Thread.currentThread().setContextClassLoader(functionClassLoader); Review comment: I don't think it is a good fix here. Because there two class loaders in the java instance, one is instance class loader and the other one is the function class loader. We set the class loader to function class loader before handling message and set it back to instance class loader. I think a better fix is to pass the function class loader to the code that load the customized Serde. Besides that, we should have a unit test or an integration test to verify this works. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services