[GitHub] [pulsar] sijie commented on a change in pull request #5357: [functions][Issue:5350]Fix pulsar can't load the customized SerDe

2019-10-27 Thread GitBox
sijie commented on a change in pull request #5357: [functions][Issue:5350]Fix 
pulsar can't load the customized SerDe
URL: https://github.com/apache/pulsar/pull/5357#discussion_r339349691
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
 ##
 @@ -163,6 +168,43 @@ private static boolean isProtobufClass(Class 
pojoClazz) {
 }
 }
 
+@SuppressWarnings("unchecked")
+private  Schema newSchemaInstance(String topic, Class clazz, 
String schemaTypeOrClassName, boolean input, ClassLoader classLoader) {
+// The schemaTypeOrClassName can represent multiple thing, either a 
schema type, a schema class name or a ser-de
+// class name.
+
+if (StringUtils.isEmpty(schemaTypeOrClassName) || 
DEFAULT_SERDE.equals(schemaTypeOrClassName)) {
+// No preferred schema was provided, auto-discover schema or 
fallback to defaults
+return newSchemaInstance(clazz, getSchemaTypeOrDefault(topic, 
clazz));
+}
+
+SchemaType schemaType = null;
+try {
+schemaType = 
SchemaType.valueOf(schemaTypeOrClassName.toUpperCase());
+} catch (IllegalArgumentException e) {
+// schemaType is not referring to builtin type
+}
+
+if (schemaType != null) {
+// The parameter passed was indeed a valid builtin schema type
+return newSchemaInstance(clazz, schemaType);
+}
+
+// At this point, the string can represent either a schema or serde 
class name. Create an instance and
+// check if it complies with either interface
+
+// First try with Schema
+try {
+return (Schema) 
InstanceUtils.initializeCustomSchema(schemaTypeOrClassName,
+classLoader, clazz, input);
+} catch (Throwable t) {
+// Now try with Serde or just fail
+SerDe serDe = (SerDe) 
InstanceUtils.initializeSerDe(schemaTypeOrClassName,
+classLoader, clazz, input);
+return new SerDeSchema<>(serDe);
+}
+}
+
 @SuppressWarnings("unchecked")
 private  Schema newSchemaInstance(String topic, Class clazz, 
String schemaTypeOrClassName, boolean input) {
 
 Review comment:
   Can you rewrite this method by calling the method in line 172?
   
   ```
   private  Schema newSchemaInstance(String topic, Class clazz, String 
schemaTypeOrClassName, boolean input) {
   return newSchemaInstance(topic, clazz, schemaTypeOrClassName, input, 
Thread.currentThread().getContextClassLoader());
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] sijie commented on a change in pull request #5357: [functions][Issue:5350]Fix pulsar can't load the customized SerDe

2019-10-27 Thread GitBox
sijie commented on a change in pull request #5357: [functions][Issue:5350]Fix 
pulsar can't load the customized SerDe
URL: https://github.com/apache/pulsar/pull/5357#discussion_r339349701
 
 

 ##
 File path: 
tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 ##
 @@ -59,11 +60,7 @@
 import org.testng.annotations.Test;
 import org.testng.collections.Maps;
 
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 Review comment:
   don't import `*`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] sijie commented on a change in pull request #5357: [functions][Issue:5350]Fix pulsar can't load the customized SerDe

2019-10-24 Thread GitBox
sijie commented on a change in pull request #5357: [functions][Issue:5350]Fix 
pulsar can't load the customized SerDe
URL: https://github.com/apache/pulsar/pull/5357#discussion_r338459546
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
 ##
 @@ -163,6 +168,43 @@ private static boolean isProtobufClass(Class 
pojoClazz) {
 }
 }
 
+@SuppressWarnings("unchecked")
+private  Schema newSchemaInstance(String topic, Class clazz, 
String schemaTypeOrClassName, boolean input, ClassLoader classLoader) {
 
 Review comment:
   I don't think we should duplicate the code when adding `classLoader`. It 
makes code maintenance become very hard. Can we try to avoid duplicating the 
code here? Instead, can you just improve the method below to handle loading 
using an additional class loader?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] sijie commented on a change in pull request #5357: [functions][Issue:5350]Fix pulsar can't load the customized SerDe

2019-10-12 Thread GitBox
sijie commented on a change in pull request #5357: [functions][Issue:5350]Fix 
pulsar can't load the customized SerDe
URL: https://github.com/apache/pulsar/pull/5357#discussion_r334229383
 
 

 ##
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 ##
 @@ -207,6 +207,8 @@ JavaInstance setupJavaInstance() throws Exception {
 
 ContextImpl contextImpl = setupContext();
 
+Thread.currentThread().setContextClassLoader(functionClassLoader);
 
 Review comment:
   I don't think it is a good fix here. Because there two class loaders in the 
java instance, one is instance class loader and the other one is the function 
class loader. We set the class loader to function class loader before handling 
message and set it back to instance class loader.
   
   I think a better fix is to pass the function class loader to the code that 
load the customized Serde.
   
   Besides that, we should have a unit test or an integration test to verify 
this works.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services