This is an automated email from the ASF dual-hosted git repository. albumenj pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/dubbo.git
commit fa16bed3a363fc35f9e03541a33a5f529cba9981 Merge: 6a44987f5f 5d1c0eae1c Author: Albumen Kevin <jhq0...@gmail.com> AuthorDate: Mon May 13 14:49:07 2024 +0800 Merge branch 'apache-3.2' into apache-3.3 # Conflicts: # dubbo-monitor/dubbo-monitor-default/src/test/java/org/apache/dubbo/monitor/dubbo/DubboMonitorTest.java # dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java # dubbo-remoting/dubbo-remoting-http12/pom.xml .../src/main/java/org/apache/dubbo/common/URL.java | 4 + .../org/apache/dubbo/config/RegistryConfig.java | 2 - .../test/java/org/apache/dubbo/common/URLTest.java | 18 +++ .../dubbo/common/bytecode/ClassGeneratorTest.java | 8 +- .../config/deploy/DefaultApplicationDeployer.java | 2 +- .../dubbo/config/utils/ConfigValidationUtils.java | 10 ++ .../dubbo/config/bootstrap/DubboBootstrapTest.java | 125 ++++++++++++++++++++- ...egistryCenterExportMetadataIntegrationTest.java | 1 - ...egistryCenterExportProviderIntegrationTest.java | 1 - ...MultipleRegistryCenterInjvmIntegrationTest.java | 1 - ...terServiceDiscoveryRegistryIntegrationTest.java | 1 - ...egistryCenterExportMetadataIntegrationTest.java | 1 - ...egistryCenterExportProviderIntegrationTest.java | 1 - .../SingleRegistryCenterInjvmIntegrationTest.java | 1 - .../dubbo/config/utils/ReferenceCacheTest.java | 1 + .../dubbo/config/spring/util/AnnotationUtils.java | 18 +-- dubbo-maven-plugin/pom.xml | 6 +- .../apache/dubbo/metadata/ServiceNameMapping.java | 8 +- .../metadata/report/MetadataReportInstance.java | 19 ++-- .../support/AbstractMetadataReportFactory.java | 7 +- .../support/AbstractMetadataReportFactoryTest.java | 22 ++++ .../store/nacos/NacosMetadataReportFactory.java | 14 --- .../prometheus/PrometheusMetricsReporterTest.java | 6 +- .../PrometheusMetricsThreadPoolTest.java | 7 +- .../resources/ReactorDubbo3TripleStub.mustache | 1 + .../apache/dubbo/qos/command/impl/LiveTest.java | 1 + .../reactive/AbstractTripleReactorSubscriber.java | 2 +- .../reactive/ServerTripleReactorSubscriber.java | 42 +++++++ .../dubbo/reactive/calls/ReactorServerCalls.java | 47 +++++--- .../reactive/handler/OneToManyMethodHandler.java | 3 +- .../registry/client/AbstractServiceDiscovery.java | 2 +- .../metadata/MetadataServiceNameMapping.java | 2 +- .../transport/ChannelHandlerDispatcherTest.java | 14 +++ .../transport/netty4/NettyConnectionClient.java | 11 ++ .../org/apache/dubbo/rpc/support/RpcUtils.java | 2 +- .../java/org/apache/dubbo/rpc/RpcStatusTest.java | 12 ++ 36 files changed, 337 insertions(+), 86 deletions(-) diff --cc dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/utils/ConfigValidationUtils.java index c1dd4105d8,2ff8978fc9..3152942f8c --- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/utils/ConfigValidationUtils.java +++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/utils/ConfigValidationUtils.java @@@ -82,10 -82,10 +82,11 @@@ import java.util.stream.Collectors import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_VALUE; import static org.apache.dubbo.common.constants.CommonConstants.CLUSTER_KEY; + import static org.apache.dubbo.common.constants.CommonConstants.CONFIG_NAMESPACE_KEY; import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_KEY; -import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_MONITOR_ADDRESS; import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_PROTOCOL; +import static org.apache.dubbo.common.constants.CommonConstants.DubboProperty.DUBBO_IP_TO_REGISTRY; +import static org.apache.dubbo.common.constants.CommonConstants.DubboProperty.DUBBO_MONITOR_ADDRESS; import static org.apache.dubbo.common.constants.CommonConstants.FILE_KEY; import static org.apache.dubbo.common.constants.CommonConstants.FILTER_KEY; import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY; @@@ -108,9 -109,9 +109,10 @@@ import static org.apache.dubbo.common.c import static org.apache.dubbo.common.constants.RegistryConstants.DEFAULT_REGISTER_MODE_INTERFACE; import static org.apache.dubbo.common.constants.RegistryConstants.DUBBO_REGISTER_MODE_DEFAULT_KEY; import static org.apache.dubbo.common.constants.RegistryConstants.REGISTER_MODE_KEY; + import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_CLUSTER_KEY; import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_KEY; import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_PROTOCOL; +import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_PROTOCOL_TYPE; import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_TYPE_KEY; import static org.apache.dubbo.common.constants.RegistryConstants.SERVICE_REGISTRY_PROTOCOL; import static org.apache.dubbo.common.constants.RemotingConstants.BACKUP_KEY; diff --cc dubbo-maven-plugin/pom.xml index 66c30332d0,e6be66aa25..5965bab27c --- a/dubbo-maven-plugin/pom.xml +++ b/dubbo-maven-plugin/pom.xml @@@ -94,16 -70,14 +94,20 @@@ </dependencies> <build> + <resources> + <resource> + <filtering>true</filtering> + <directory>src/main/resources</directory> + </resource> + </resources> <plugins> <plugin> + <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-plugin-plugin</artifactId> - <version>3.10.2</version> + <version>3.13.0</version> + <configuration> + <goalPrefix>dubbo</goalPrefix> + </configuration> <executions> <execution> <id>default-addPluginArtifactMetadata</id> diff --cc dubbo-plugin/dubbo-compiler/src/main/resources/ReactorDubbo3TripleStub.mustache index ad9e2e232e,0000000000..c36513f916 mode 100644,000000..100644 --- a/dubbo-plugin/dubbo-compiler/src/main/resources/ReactorDubbo3TripleStub.mustache +++ b/dubbo-plugin/dubbo-compiler/src/main/resources/ReactorDubbo3TripleStub.mustache @@@ -1,197 -1,0 +1,198 @@@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +{{#packageName}} +package {{packageName}}; +{{/packageName}} + +import com.google.protobuf.Message; +import org.apache.dubbo.common.URL; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.PathResolver; +import org.apache.dubbo.rpc.RpcException; +import org.apache.dubbo.rpc.ServerService; +import org.apache.dubbo.rpc.TriRpcStatus; +import org.apache.dubbo.rpc.model.MethodDescriptor; +import org.apache.dubbo.rpc.model.ServiceDescriptor; +import org.apache.dubbo.rpc.model.StubMethodDescriptor; +import org.apache.dubbo.rpc.model.StubServiceDescriptor; +import org.apache.dubbo.reactive.handler.ManyToManyMethodHandler; +import org.apache.dubbo.reactive.handler.ManyToOneMethodHandler; +import org.apache.dubbo.reactive.handler.OneToManyMethodHandler; +import org.apache.dubbo.reactive.calls.ReactorClientCalls; +import org.apache.dubbo.reactive.handler.OneToOneMethodHandler; + +import org.apache.dubbo.rpc.stub.StubInvoker; +import org.apache.dubbo.rpc.stub.StubMethodHandler; +import org.apache.dubbo.rpc.stub.StubSuppliers; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.HashMap; +import java.util.Map; + +public final class {{className}} { + + private {{className}}() {} + + public static final String SERVICE_NAME = {{interfaceClassName}}.SERVICE_NAME; + + private static final StubServiceDescriptor serviceDescriptor = new StubServiceDescriptor(SERVICE_NAME,{{interfaceClassName}}.class); + + static { ++ org.apache.dubbo.rpc.protocol.tri.service.SchemaDescriptorRegistry.addSchemaDescriptor(SERVICE_NAME,{{outerClassName}}.getDescriptor()); + StubSuppliers.addSupplier(SERVICE_NAME, {{className}}::newStub); + StubSuppliers.addSupplier({{interfaceClassName}}.JAVA_SERVICE_NAME, {{className}}::newStub); + StubSuppliers.addDescriptor(SERVICE_NAME, serviceDescriptor); + StubSuppliers.addDescriptor({{interfaceClassName}}.JAVA_SERVICE_NAME, serviceDescriptor); + } + + @SuppressWarnings("all") + public static {{interfaceClassName}} newStub(Invoker<?> invoker) { + return new {{interfaceClassName}}Stub((Invoker<{{interfaceClassName}}>)invoker); + } + +{{#unaryMethods}} + {{#javaDoc}} + {{{javaDoc}}} + {{/javaDoc}} + private static final StubMethodDescriptor {{methodName}}Method = new StubMethodDescriptor("{{originMethodName}}", + {{inputType}}.class, {{outputType}}.class, MethodDescriptor.RpcType.UNARY, + obj -> ((Message) obj).toByteArray(), obj -> ((Message) obj).toByteArray(), {{inputType}}::parseFrom, + {{outputType}}::parseFrom); +{{/unaryMethods}} + +{{#serverStreamingMethods}} + {{#javaDoc}} + {{{javaDoc}}} + {{/javaDoc}} + private static final StubMethodDescriptor {{methodName}}Method = new StubMethodDescriptor("{{originMethodName}}", + {{inputType}}.class, {{outputType}}.class, MethodDescriptor.RpcType.SERVER_STREAM, + obj -> ((Message) obj).toByteArray(), obj -> ((Message) obj).toByteArray(), {{inputType}}::parseFrom, + {{outputType}}::parseFrom); +{{/serverStreamingMethods}} + +{{#clientStreamingMethods}} + {{#javaDoc}} + {{{javaDoc}}} + {{/javaDoc}} + private static final StubMethodDescriptor {{methodName}}Method = new StubMethodDescriptor("{{originMethodName}}", + {{inputType}}.class, {{outputType}}.class, MethodDescriptor.RpcType.CLIENT_STREAM, + obj -> ((Message) obj).toByteArray(), obj -> ((Message) obj).toByteArray(), {{inputType}}::parseFrom, + {{outputType}}::parseFrom); +{{/clientStreamingMethods}} + +{{#biStreamingWithoutClientStreamMethods}} + {{#javaDoc}} + {{{javaDoc}}} + {{/javaDoc}} + private static final StubMethodDescriptor {{methodName}}Method = new StubMethodDescriptor("{{originMethodName}}", + {{inputType}}.class, {{outputType}}.class, MethodDescriptor.RpcType.BI_STREAM, + obj -> ((Message) obj).toByteArray(), obj -> ((Message) obj).toByteArray(), {{inputType}}::parseFrom, + {{outputType}}::parseFrom); +{{/biStreamingWithoutClientStreamMethods}} + + static{ + {{#unaryMethods}} + serviceDescriptor.addMethod({{methodName}}Method); + {{/unaryMethods}} + {{#serverStreamingMethods}} + serviceDescriptor.addMethod({{methodName}}Method); + {{/serverStreamingMethods}} + {{#clientStreamingMethods}} + serviceDescriptor.addMethod({{methodName}}Method); + {{/clientStreamingMethods}} + {{#biStreamingWithoutClientStreamMethods}} + serviceDescriptor.addMethod({{methodName}}Method); + {{/biStreamingWithoutClientStreamMethods}} + } + + public static class {{interfaceClassName}}Stub implements {{interfaceClassName}}{ + + private final Invoker<{{interfaceClassName}}> invoker; + + public {{interfaceClassName}}Stub(Invoker<{{interfaceClassName}}> invoker) { + this.invoker = invoker; + } + + {{#methods}} + {{#javaDoc}} + {{{javaDoc}}} + {{/javaDoc}} + {{#deprecated}} + @java.lang.Deprecated + {{/deprecated}} + public {{#isManyOutput}}Flux{{/isManyOutput}}{{^isManyOutput}}Mono{{/isManyOutput}}<{{outputType}}> {{methodName}}({{#isManyInput}}Flux{{/isManyInput}}{{^isManyInput}}Mono{{/isManyInput}}<{{inputType}}> request) { + return ReactorClientCalls.{{reactiveCallsMethodName}}(invoker, request, {{methodNameCamelCase}}Method); + } + {{/methods}} + } + + public static abstract class {{interfaceClassName}}ImplBase implements {{interfaceClassName}}, ServerService<{{interfaceClassName}}> { + + @Override + public final Invoker<{{interfaceClassName}}> getInvoker(URL url) { + PathResolver pathResolver = url.getOrDefaultFrameworkModel() + .getExtensionLoader(PathResolver.class) + .getDefaultExtension(); + Map<String,StubMethodHandler<?, ?>> handlers = new HashMap<>(); + + {{#methods}} + pathResolver.addNativeStub( "/" + SERVICE_NAME + "/{{originMethodName}}"); + // for compatibility + pathResolver.addNativeStub( "/" + JAVA_SERVICE_NAME + "/{{originMethodName}}"); + {{/methods}} + + {{#unaryMethods}} + handlers.put({{methodName}}Method.getMethodName(), new OneToOneMethodHandler<>(this::{{methodName}})); + {{/unaryMethods}} + {{#serverStreamingMethods}} + handlers.put({{methodName}}Method.getMethodName(), new OneToManyMethodHandler<>(this::{{methodName}})); + {{/serverStreamingMethods}} + {{#clientStreamingMethods}} + handlers.put({{methodName}}Method.getMethodName(), new ManyToOneMethodHandler<>(this::{{methodName}})); + {{/clientStreamingMethods}} + {{#biStreamingWithoutClientStreamMethods}} + handlers.put({{methodName}}Method.getMethodName(), new ManyToManyMethodHandler<>(this::{{methodName}})); + {{/biStreamingWithoutClientStreamMethods}} + + return new StubInvoker<>(this, url, {{interfaceClassName}}.class, handlers); + } + + {{#methods}} + {{#javaDoc}} + {{{javaDoc}}} + {{/javaDoc}} + {{#deprecated}} + @java.lang.Deprecated + {{/deprecated}} + public {{#isManyOutput}}Flux{{/isManyOutput}}{{^isManyOutput}}Mono{{/isManyOutput}}<{{outputType}}> {{methodName}}({{#isManyInput}}Flux{{/isManyInput}}{{^isManyInput}}Mono{{/isManyInput}}<{{inputType}}> request) { + throw unimplementedMethodException({{methodName}}Method); + } + {{/methods}} + + @Override + public final ServiceDescriptor getServiceDescriptor() { + return serviceDescriptor; + } + + private RpcException unimplementedMethodException(StubMethodDescriptor methodDescriptor) { + return TriRpcStatus.UNIMPLEMENTED.withDescription(String.format("Method %s is unimplemented", + "/" + serviceDescriptor.getInterfaceName() + "/" + methodDescriptor.getMethodName())).asException(); + } + } +} diff --cc dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java index 5402cefc30,24218a1b08..ea96e071e8 --- a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java +++ b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java @@@ -61,14 -67,21 +67,21 @@@ public final class ReactorServerCalls * @param responseObserver response StreamObserver * @param func service implementation */ - public static <T, R> void oneToMany( + public static <T, R> CompletableFuture<List<R>> oneToMany( T request, StreamObserver<R> responseObserver, Function<Mono<T>, Flux<R>> func) { try { - ServerCallToObserverAdapter<R> serverCallToObserverAdapter = - (ServerCallToObserverAdapter<R>) responseObserver; ++ CallStreamObserver<R> callStreamObserver = ++ (CallStreamObserver<R>) responseObserver; Flux<R> response = func.apply(Mono.just(request)); - ServerTripleReactorSubscriber<R> subscriber = response.subscribeWith(new ServerTripleReactorSubscriber<>()); - subscriber.subscribe((CallStreamObserver<R>) responseObserver); + ServerTripleReactorSubscriber<R> reactorSubscriber = - new ServerTripleReactorSubscriber<>(serverCallToObserverAdapter); - response.subscribeWith(reactorSubscriber).subscribe(serverCallToObserverAdapter); ++ new ServerTripleReactorSubscriber<>(callStreamObserver); ++ response.subscribeWith(reactorSubscriber).subscribe(callStreamObserver); + return reactorSubscriber.getExecutionFuture(); } catch (Throwable throwable) { - responseObserver.onError(throwable); + doOnResponseHasException(throwable, responseObserver); + CompletableFuture<List<R>> future = new CompletableFuture<>(); + future.completeExceptionally(throwable); + return future; } }