sijie closed pull request #2586: fixing/improving logging for function instance URL: https://github.com/apache/incubator-pulsar/pull/2586
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java index 11da7c30ac..083686b503 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java @@ -22,27 +22,18 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.beust.jcommander.converters.StringConverter; -import com.google.gson.Gson; import com.google.protobuf.Empty; import com.google.protobuf.util.JsonFormat; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import lombok.extern.slf4j.Slf4j; - -import static org.apache.commons.lang3.StringUtils.isNotBlank; - import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; -import org.apache.pulsar.functions.proto.Function; -import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees; -import org.apache.pulsar.functions.proto.Function.SinkSpec; -import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceControlGrpc; -import java.util.Map; import java.util.TimerTask; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java index a978a09cb0..c5159b0706 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java @@ -22,32 +22,32 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.gson.Gson; import com.google.protobuf.Empty; import com.google.protobuf.util.JsonFormat; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import lombok.Getter; import lombok.extern.slf4j.Slf4j; - -import static org.apache.commons.lang3.StringUtils.isNotBlank; - -import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; import org.apache.pulsar.functions.proto.InstanceControlGrpc; +import org.apache.pulsar.functions.utils.FunctionDetailsUtils; import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry; import java.io.InputStream; -import java.util.*; +import java.util.LinkedList; +import java.util.List; +import java.util.TimerTask; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import static org.apache.commons.lang3.StringUtils.isNotBlank; + /** * A function container implemented using java thread. */ @@ -97,8 +97,14 @@ // by the child process and manually added to classpath args.add(String.format("-D%s=%s", FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY, instanceFile)); args.add("-Dlog4j.configurationFile=java_instance_log4j2.yml"); - args.add("-Dpulsar.log.dir=" + logDirectory); - args.add("-Dpulsar.log.file=" + instanceConfig.getFunctionDetails().getName()); + args.add("-Dpulsar.function.log.dir=" + String.format( + "%s/%s", + logDirectory, + FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()))); + args.add("-Dpulsar.function.log.file=" + String.format( + "%s-%s", + instanceConfig.getFunctionDetails().getName(), + instanceConfig.getInstanceId())); if (instanceConfig.getFunctionDetails().getResources() != null) { Function.Resources resources = instanceConfig.getFunctionDetails().getResources(); if (resources.getRam() != 0) { diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java index 5cb8ce09cc..d752ed789d 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java @@ -72,7 +72,9 @@ public void start() { log.info("ThreadContainer starting function with instance config {}", instanceConfig); this.fnThread = new Thread(threadGroup, javaInstanceRunnable, - FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails())); + String.format("%s-%s", + FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()), + instanceConfig.getInstanceId())); this.fnThread.start(); } diff --git a/pulsar-functions/runtime/src/main/resources/java_instance_log4j2.yml b/pulsar-functions/runtime/src/main/resources/java_instance_log4j2.yml index 25583a61f6..df25367562 100644 --- a/pulsar-functions/runtime/src/main/resources/java_instance_log4j2.yml +++ b/pulsar-functions/runtime/src/main/resources/java_instance_log4j2.yml @@ -23,16 +23,10 @@ Configuration: Properties: Property: - - name: "pulsar.log.dir" - value: "logs" - - name: "pulsar.log.file" - value: "pulsar-functions.log" - name: "pulsar.log.appender" - value: "RoutingAppender" + value: "RollingFile" - name: "pulsar.log.level" value: "info" - - name: "pulsar.routing.appender.default" - value: "RollingFile" - name: "bk.log.level" value: "info" @@ -48,8 +42,8 @@ Configuration: # Rolling file appender configuration RollingFile: name: RollingFile - fileName: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}" - filePattern: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}-%d{MM-dd-yyyy}-%i.log.gz" + fileName: "${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.log" + filePattern: "${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}-%d{MM-dd-yyyy}-%i.log.gz" immediateFlush: false PatternLayout: Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" @@ -66,19 +60,19 @@ Configuration: # Delete file older than 30days DefaultRolloverStrategy: Delete: - basePath: ${sys:pulsar.log.dir} + basePath: ${sys:pulsar.function.log.dir} maxDepth: 2 IfFileName: - glob: "*/${sys:pulsar.log.file}*log.gz" + glob: "*/${sys:pulsar.function.log.file}*log.gz" IfLastModified: age: 30d # Rolling file appender configuration for bk RollingRandomAccessFile: name: BkRollingFile - fileName: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}.bk" - filePattern: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}.bk-%d{MM-dd-yyyy}-%i.log.gz" - immediateFlush: true + fileName: "${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.bk" + filePattern: "${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.bk-%d{MM-dd-yyyy}-%i.log.gz" + immediateFlush: false PatternLayout: Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" Policies: @@ -94,67 +88,17 @@ Configuration: # Delete file older than 30days DefaultRolloverStrategy: Delete: - basePath: ${sys:pulsar.log.dir} + basePath: ${sys:pulsar.function.log.dir} maxDepth: 2 IfFileName: - glob: "*/${sys:pulsar.log.file}.bk*log.gz" + glob: "*/${sys:pulsar.function.log.file}.bk*log.gz" IfLastModified: age: 30d - # Routing - Routing: - name: RoutingAppender - Routes: - pattern: "$${ctx:function}" - Route: - - - Routing: - name: InstanceRoutingAppender - Routes: - pattern: "$${ctx:instance}" - Route: - - - RollingFile: - name: "Rolling-${ctx:function}" - fileName : "${sys:pulsar.log.dir}/${ctx:function}/${sys:pulsar.log.file}-${ctx:instance}.log" - filePattern : "${sys:pulsar.log.dir}/${ctx:function}/${sys:pulsar.log.file}-${ctx:instance}-%d{MM-dd-yyyy}-%i.log.gz" - PatternLayout: - Pattern: "%d{ABSOLUTE} %level{length=5} [%thread] [instance-%X{instance}] %logger{1} - %msg%n" - Policies: - TimeBasedTriggeringPolicy: - interval: 1 - modulate: true - SizeBasedTriggeringPolicy: - size: "20MB" - # Trigger every day at midnight that also scan - # roll-over strategy that deletes older file - CronTriggeringPolicy: - schedule: "0 0 0 * * ?" - # Delete file older than 30days - DefaultRolloverStrategy: - Delete: - basePath: ${sys:pulsar.log.dir} - maxDepth: 2 - IfFileName: - glob: "*/${sys:pulsar.log.file}*log.gz" - IfLastModified: - age: 30d - - ref: "${sys:pulsar.routing.appender.default}" - key: "${ctx:function}" - - ref: "${sys:pulsar.routing.appender.default}" - key: "${ctx:function}" - Loggers: Logger: - name: org.apache.bookkeeper - level: "${sys:bk.log.level}" - additivity: false - AppenderRef: - - ref: BkRollingFile - - Logger: - name: org.apache.distributedlog + name: org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper level: "${sys:bk.log.level}" additivity: false AppenderRef: diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java index 5a6517e288..6f1e2f3ea6 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java @@ -32,6 +32,7 @@ import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.ConsumerSpec; import org.apache.pulsar.functions.proto.Function.FunctionDetails; +import org.apache.pulsar.functions.utils.FunctionDetailsUtils; import org.testng.annotations.AfterMethod; import org.testng.annotations.Test; @@ -119,7 +120,8 @@ public void testJavaConstructor() throws Exception { String expectedArgs = "java -cp " + javaInstanceJarFile + " -Dpulsar.functions.java.instance.jar=" + javaInstanceJarFile + " -Dlog4j.configurationFile=java_instance_log4j2.yml " - + "-Dpulsar.log.dir=" + logDirectory + "/functions" + " -Dpulsar.log.file=" + config.getFunctionDetails().getName() + + "-Dpulsar.function.log.dir=" + logDirectory + "/functions/" + FunctionDetailsUtils.getFullyQualifiedName(config.getFunctionDetails()) + + " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-" + config.getInstanceId() + " org.apache.pulsar.functions.runtime.JavaInstanceMain" + " --jar " + userJarFile + " --instance_id " + config.getInstanceId() + " --function_id " + config.getFunctionId() ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services