This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new b8734646617 [fix][fn] fix function failed to start if no
`typeClassName` provided in `FunctionDetails` (#18111)
b8734646617 is described below
commit b87346466170c60cafb1a730c9e6e20677e5e8f5
Author: Rui Fu <[email protected]>
AuthorDate: Fri Nov 11 14:25:32 2022 +0800
[fix][fn] fix function failed to start if no `typeClassName` provided in
`FunctionDetails` (#18111)
(cherry picked from commit 8ad7157c1d22195720d256391a32773ca0108b80)
---
.../functions/runtime/JavaInstanceStarter.java | 96 ++++++++++++++++++++--
1 file changed, 89 insertions(+), 7 deletions(-)
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
index af7fc5cccc3..72c97ba561b 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.functions.runtime;
+import static org.apache.pulsar.functions.utils.FunctionCommon.getSinkType;
+import static org.apache.pulsar.functions.utils.FunctionCommon.getSourceType;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.converters.StringConverter;
@@ -30,8 +32,15 @@ import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import io.prometheus.client.exporter.HTTPServer;
+import java.lang.reflect.Type;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.functions.WindowConfig;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceCache;
@@ -43,15 +52,9 @@ import org.apache.pulsar.functions.proto.InstanceControlGrpc;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.common.util.Reflections;
-import java.lang.reflect.Type;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
@Slf4j
public class JavaInstanceStarter implements AutoCloseable {
@@ -164,6 +167,7 @@ public class JavaInstanceStarter implements AutoCloseable {
functionDetailsJsonString = functionDetailsJsonString.substring(0,
functionDetailsJsonString.length() - 1);
}
JsonFormat.parser().merge(functionDetailsJsonString,
functionDetailsBuilder);
+ inferringMissingTypeClassName(functionDetailsBuilder,
functionInstanceClassLoader);
Function.FunctionDetails functionDetails =
functionDetailsBuilder.build();
instanceConfig.setFunctionDetails(functionDetails);
instanceConfig.setPort(port);
@@ -286,6 +290,84 @@ public class JavaInstanceStarter implements AutoCloseable {
}
}
+ private void
inferringMissingTypeClassName(Function.FunctionDetails.Builder
functionDetailsBuilder,
+ ClassLoader classLoader) throws
ClassNotFoundException {
+ switch (functionDetailsBuilder.getComponentType()) {
+ case FUNCTION:
+ if ((functionDetailsBuilder.hasSource()
+ &&
functionDetailsBuilder.getSource().getTypeClassName().isEmpty())
+ || (functionDetailsBuilder.hasSink()
+ &&
functionDetailsBuilder.getSink().getTypeClassName().isEmpty())) {
+ Map<String, Object> userConfigs = new
Gson().fromJson(functionDetailsBuilder.getUserConfig(),
+ new TypeToken<Map<String, Object>>() {
+ }.getType());
+ boolean isWindowConfigPresent =
userConfigs.containsKey(WindowConfig.WINDOW_CONFIG_KEY);
+ String className = functionDetailsBuilder.getClassName();
+ if (isWindowConfigPresent) {
+ WindowConfig windowConfig = new Gson().fromJson(
+ (new
Gson().toJson(userConfigs.get(WindowConfig.WINDOW_CONFIG_KEY))),
+ WindowConfig.class);
+ className =
windowConfig.getActualWindowFunctionClassName();
+ }
+
+ Class<?>[] typeArgs =
FunctionCommon.getFunctionTypes(classLoader.loadClass(className),
+ isWindowConfigPresent);
+ if (functionDetailsBuilder.hasSource()
+ &&
functionDetailsBuilder.getSource().getTypeClassName().isEmpty()
+ && typeArgs[0] != null) {
+ Function.SourceSpec.Builder sourceBuilder =
functionDetailsBuilder.getSource().toBuilder();
+ sourceBuilder.setTypeClassName(typeArgs[0].getName());
+
functionDetailsBuilder.setSource(sourceBuilder.build());
+ }
+
+ if (functionDetailsBuilder.hasSink()
+ &&
functionDetailsBuilder.getSink().getTypeClassName().isEmpty()
+ && typeArgs[1] != null) {
+ Function.SinkSpec.Builder sinkBuilder =
functionDetailsBuilder.getSink().toBuilder();
+ sinkBuilder.setTypeClassName(typeArgs[1].getName());
+ functionDetailsBuilder.setSink(sinkBuilder.build());
+ }
+ }
+ break;
+ case SINK:
+ if ((functionDetailsBuilder.hasSink()
+ &&
functionDetailsBuilder.getSink().getTypeClassName().isEmpty())) {
+ String typeArg =
getSinkType(functionDetailsBuilder.getClassName(), classLoader).getName();
+
+ Function.SinkSpec.Builder sinkBuilder =
+
Function.SinkSpec.newBuilder(functionDetailsBuilder.getSink());
+ sinkBuilder.setTypeClassName(typeArg);
+ functionDetailsBuilder.setSink(sinkBuilder);
+
+ Function.SourceSpec sourceSpec =
functionDetailsBuilder.getSource();
+ if (null == sourceSpec ||
StringUtils.isEmpty(sourceSpec.getTypeClassName())) {
+ Function.SourceSpec.Builder sourceBuilder =
Function.SourceSpec.newBuilder(sourceSpec);
+ sourceBuilder.setTypeClassName(typeArg);
+ functionDetailsBuilder.setSource(sourceBuilder);
+ }
+ }
+ break;
+ case SOURCE:
+ if ((functionDetailsBuilder.hasSource()
+ &&
functionDetailsBuilder.getSource().getTypeClassName().isEmpty())) {
+ String typeArg =
getSourceType(functionDetailsBuilder.getClassName(), classLoader).getName();
+
+ Function.SourceSpec.Builder sourceBuilder =
+
Function.SourceSpec.newBuilder(functionDetailsBuilder.getSource());
+ sourceBuilder.setTypeClassName(typeArg);
+ functionDetailsBuilder.setSource(sourceBuilder);
+
+ Function.SinkSpec sinkSpec =
functionDetailsBuilder.getSink();
+ if (null == sinkSpec ||
StringUtils.isEmpty(sinkSpec.getTypeClassName())) {
+ Function.SinkSpec.Builder sinkBuilder =
Function.SinkSpec.newBuilder(sinkSpec);
+ sinkBuilder.setTypeClassName(typeArg);
+ functionDetailsBuilder.setSink(sinkBuilder);
+ }
+ }
+ break;
+ }
+ }
+
class InstanceControlImpl extends
InstanceControlGrpc.InstanceControlImplBase {
private RuntimeSpawner runtimeSpawner;