This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git

commit fa16bed3a363fc35f9e03541a33a5f529cba9981
Merge: 6a44987f5f 5d1c0eae1c
Author: Albumen Kevin <jhq0...@gmail.com>
AuthorDate: Mon May 13 14:49:07 2024 +0800

    Merge branch 'apache-3.2' into apache-3.3
    
    # Conflicts:
    #       
dubbo-monitor/dubbo-monitor-default/src/test/java/org/apache/dubbo/monitor/dubbo/DubboMonitorTest.java
    #       
dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java
    #       dubbo-remoting/dubbo-remoting-http12/pom.xml

 .../src/main/java/org/apache/dubbo/common/URL.java |   4 +
 .../org/apache/dubbo/config/RegistryConfig.java    |   2 -
 .../test/java/org/apache/dubbo/common/URLTest.java |  18 +++
 .../dubbo/common/bytecode/ClassGeneratorTest.java  |   8 +-
 .../config/deploy/DefaultApplicationDeployer.java  |   2 +-
 .../dubbo/config/utils/ConfigValidationUtils.java  |  10 ++
 .../dubbo/config/bootstrap/DubboBootstrapTest.java | 125 ++++++++++++++++++++-
 ...egistryCenterExportMetadataIntegrationTest.java |   1 -
 ...egistryCenterExportProviderIntegrationTest.java |   1 -
 ...MultipleRegistryCenterInjvmIntegrationTest.java |   1 -
 ...terServiceDiscoveryRegistryIntegrationTest.java |   1 -
 ...egistryCenterExportMetadataIntegrationTest.java |   1 -
 ...egistryCenterExportProviderIntegrationTest.java |   1 -
 .../SingleRegistryCenterInjvmIntegrationTest.java  |   1 -
 .../dubbo/config/utils/ReferenceCacheTest.java     |   1 +
 .../dubbo/config/spring/util/AnnotationUtils.java  |  18 +--
 dubbo-maven-plugin/pom.xml                         |   6 +-
 .../apache/dubbo/metadata/ServiceNameMapping.java  |   8 +-
 .../metadata/report/MetadataReportInstance.java    |  19 ++--
 .../support/AbstractMetadataReportFactory.java     |   7 +-
 .../support/AbstractMetadataReportFactoryTest.java |  22 ++++
 .../store/nacos/NacosMetadataReportFactory.java    |  14 ---
 .../prometheus/PrometheusMetricsReporterTest.java  |   6 +-
 .../PrometheusMetricsThreadPoolTest.java           |   7 +-
 .../resources/ReactorDubbo3TripleStub.mustache     |   1 +
 .../apache/dubbo/qos/command/impl/LiveTest.java    |   1 +
 .../reactive/AbstractTripleReactorSubscriber.java  |   2 +-
 .../reactive/ServerTripleReactorSubscriber.java    |  42 +++++++
 .../dubbo/reactive/calls/ReactorServerCalls.java   |  47 +++++---
 .../reactive/handler/OneToManyMethodHandler.java   |   3 +-
 .../registry/client/AbstractServiceDiscovery.java  |   2 +-
 .../metadata/MetadataServiceNameMapping.java       |   2 +-
 .../transport/ChannelHandlerDispatcherTest.java    |  14 +++
 .../transport/netty4/NettyConnectionClient.java    |  11 ++
 .../org/apache/dubbo/rpc/support/RpcUtils.java     |   2 +-
 .../java/org/apache/dubbo/rpc/RpcStatusTest.java   |  12 ++
 36 files changed, 337 insertions(+), 86 deletions(-)

diff --cc 
dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/utils/ConfigValidationUtils.java
index c1dd4105d8,2ff8978fc9..3152942f8c
--- 
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/utils/ConfigValidationUtils.java
+++ 
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/utils/ConfigValidationUtils.java
@@@ -82,10 -82,10 +82,11 @@@ import java.util.stream.Collectors
  
  import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_VALUE;
  import static org.apache.dubbo.common.constants.CommonConstants.CLUSTER_KEY;
+ import static 
org.apache.dubbo.common.constants.CommonConstants.CONFIG_NAMESPACE_KEY;
  import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_KEY;
 -import static 
org.apache.dubbo.common.constants.CommonConstants.DUBBO_MONITOR_ADDRESS;
  import static 
org.apache.dubbo.common.constants.CommonConstants.DUBBO_PROTOCOL;
 +import static 
org.apache.dubbo.common.constants.CommonConstants.DubboProperty.DUBBO_IP_TO_REGISTRY;
 +import static 
org.apache.dubbo.common.constants.CommonConstants.DubboProperty.DUBBO_MONITOR_ADDRESS;
  import static org.apache.dubbo.common.constants.CommonConstants.FILE_KEY;
  import static org.apache.dubbo.common.constants.CommonConstants.FILTER_KEY;
  import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
@@@ -108,9 -109,9 +109,10 @@@ import static org.apache.dubbo.common.c
  import static 
org.apache.dubbo.common.constants.RegistryConstants.DEFAULT_REGISTER_MODE_INTERFACE;
  import static 
org.apache.dubbo.common.constants.RegistryConstants.DUBBO_REGISTER_MODE_DEFAULT_KEY;
  import static 
org.apache.dubbo.common.constants.RegistryConstants.REGISTER_MODE_KEY;
+ import static 
org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_CLUSTER_KEY;
  import static 
org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_KEY;
  import static 
org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_PROTOCOL;
 +import static 
org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_PROTOCOL_TYPE;
  import static 
org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_TYPE_KEY;
  import static 
org.apache.dubbo.common.constants.RegistryConstants.SERVICE_REGISTRY_PROTOCOL;
  import static org.apache.dubbo.common.constants.RemotingConstants.BACKUP_KEY;
diff --cc dubbo-maven-plugin/pom.xml
index 66c30332d0,e6be66aa25..5965bab27c
--- a/dubbo-maven-plugin/pom.xml
+++ b/dubbo-maven-plugin/pom.xml
@@@ -94,16 -70,14 +94,20 @@@
    </dependencies>
  
    <build>
 +    <resources>
 +      <resource>
 +        <filtering>true</filtering>
 +        <directory>src/main/resources</directory>
 +      </resource>
 +    </resources>
      <plugins>
        <plugin>
+         <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-plugin-plugin</artifactId>
-         <version>3.10.2</version>
+         <version>3.13.0</version>
+         <configuration>
+           <goalPrefix>dubbo</goalPrefix>
+         </configuration>
          <executions>
            <execution>
              <id>default-addPluginArtifactMetadata</id>
diff --cc 
dubbo-plugin/dubbo-compiler/src/main/resources/ReactorDubbo3TripleStub.mustache
index ad9e2e232e,0000000000..c36513f916
mode 100644,000000..100644
--- 
a/dubbo-plugin/dubbo-compiler/src/main/resources/ReactorDubbo3TripleStub.mustache
+++ 
b/dubbo-plugin/dubbo-compiler/src/main/resources/ReactorDubbo3TripleStub.mustache
@@@ -1,197 -1,0 +1,198 @@@
 +/*
 +* Licensed to the Apache Software Foundation (ASF) under one or more
 +* contributor license agreements.  See the NOTICE file distributed with
 +* this work for additional information regarding copyright ownership.
 +* The ASF licenses this file to You under the Apache License, Version 2.0
 +* (the "License"); you may not use this file except in compliance with
 +* the License.  You may obtain a copy of the License at
 +*
 +*     http://www.apache.org/licenses/LICENSE-2.0
 +*
 +* Unless required by applicable law or agreed to in writing, software
 +* distributed under the License is distributed on an "AS IS" BASIS,
 +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 +* See the License for the specific language governing permissions and
 +* limitations under the License.
 +*/
 +
 +{{#packageName}}
 +package {{packageName}};
 +{{/packageName}}
 +
 +import com.google.protobuf.Message;
 +import org.apache.dubbo.common.URL;
 +import org.apache.dubbo.rpc.Invoker;
 +import org.apache.dubbo.rpc.PathResolver;
 +import org.apache.dubbo.rpc.RpcException;
 +import org.apache.dubbo.rpc.ServerService;
 +import org.apache.dubbo.rpc.TriRpcStatus;
 +import org.apache.dubbo.rpc.model.MethodDescriptor;
 +import org.apache.dubbo.rpc.model.ServiceDescriptor;
 +import org.apache.dubbo.rpc.model.StubMethodDescriptor;
 +import org.apache.dubbo.rpc.model.StubServiceDescriptor;
 +import org.apache.dubbo.reactive.handler.ManyToManyMethodHandler;
 +import org.apache.dubbo.reactive.handler.ManyToOneMethodHandler;
 +import org.apache.dubbo.reactive.handler.OneToManyMethodHandler;
 +import org.apache.dubbo.reactive.calls.ReactorClientCalls;
 +import org.apache.dubbo.reactive.handler.OneToOneMethodHandler;
 +
 +import org.apache.dubbo.rpc.stub.StubInvoker;
 +import org.apache.dubbo.rpc.stub.StubMethodHandler;
 +import org.apache.dubbo.rpc.stub.StubSuppliers;
 +import reactor.core.publisher.Flux;
 +import reactor.core.publisher.Mono;
 +
 +import java.util.HashMap;
 +import java.util.Map;
 +
 +public final class {{className}} {
 +
 +    private {{className}}() {}
 +
 +    public static final String SERVICE_NAME = 
{{interfaceClassName}}.SERVICE_NAME;
 +
 +    private static final StubServiceDescriptor serviceDescriptor = new 
StubServiceDescriptor(SERVICE_NAME,{{interfaceClassName}}.class);
 +
 +    static {
++        
org.apache.dubbo.rpc.protocol.tri.service.SchemaDescriptorRegistry.addSchemaDescriptor(SERVICE_NAME,{{outerClassName}}.getDescriptor());
 +        StubSuppliers.addSupplier(SERVICE_NAME, {{className}}::newStub);
 +        StubSuppliers.addSupplier({{interfaceClassName}}.JAVA_SERVICE_NAME,  
{{className}}::newStub);
 +        StubSuppliers.addDescriptor(SERVICE_NAME, serviceDescriptor);
 +        StubSuppliers.addDescriptor({{interfaceClassName}}.JAVA_SERVICE_NAME, 
serviceDescriptor);
 +    }
 +
 +    @SuppressWarnings("all")
 +    public static {{interfaceClassName}} newStub(Invoker<?> invoker) {
 +        return new 
{{interfaceClassName}}Stub((Invoker<{{interfaceClassName}}>)invoker);
 +    }
 +
 +{{#unaryMethods}}
 +    {{#javaDoc}}
 +        {{{javaDoc}}}
 +    {{/javaDoc}}
 +    private static final StubMethodDescriptor {{methodName}}Method = new 
StubMethodDescriptor("{{originMethodName}}",
 +        {{inputType}}.class, {{outputType}}.class, 
MethodDescriptor.RpcType.UNARY,
 +        obj -> ((Message) obj).toByteArray(), obj -> ((Message) 
obj).toByteArray(), {{inputType}}::parseFrom,
 +        {{outputType}}::parseFrom);
 +{{/unaryMethods}}
 +
 +{{#serverStreamingMethods}}
 +    {{#javaDoc}}
 +        {{{javaDoc}}}
 +    {{/javaDoc}}
 +    private static final StubMethodDescriptor {{methodName}}Method = new 
StubMethodDescriptor("{{originMethodName}}",
 +        {{inputType}}.class, {{outputType}}.class, 
MethodDescriptor.RpcType.SERVER_STREAM,
 +        obj -> ((Message) obj).toByteArray(), obj -> ((Message) 
obj).toByteArray(), {{inputType}}::parseFrom,
 +        {{outputType}}::parseFrom);
 +{{/serverStreamingMethods}}
 +
 +{{#clientStreamingMethods}}
 +    {{#javaDoc}}
 +        {{{javaDoc}}}
 +    {{/javaDoc}}
 +    private static final StubMethodDescriptor {{methodName}}Method = new 
StubMethodDescriptor("{{originMethodName}}",
 +        {{inputType}}.class, {{outputType}}.class, 
MethodDescriptor.RpcType.CLIENT_STREAM,
 +        obj -> ((Message) obj).toByteArray(), obj -> ((Message) 
obj).toByteArray(), {{inputType}}::parseFrom,
 +        {{outputType}}::parseFrom);
 +{{/clientStreamingMethods}}
 +
 +{{#biStreamingWithoutClientStreamMethods}}
 +    {{#javaDoc}}
 +        {{{javaDoc}}}
 +    {{/javaDoc}}
 +    private static final StubMethodDescriptor {{methodName}}Method = new 
StubMethodDescriptor("{{originMethodName}}",
 +        {{inputType}}.class, {{outputType}}.class, 
MethodDescriptor.RpcType.BI_STREAM,
 +        obj -> ((Message) obj).toByteArray(), obj -> ((Message) 
obj).toByteArray(), {{inputType}}::parseFrom,
 +        {{outputType}}::parseFrom);
 +{{/biStreamingWithoutClientStreamMethods}}
 +
 +    static{
 +    {{#unaryMethods}}
 +        serviceDescriptor.addMethod({{methodName}}Method);
 +    {{/unaryMethods}}
 +    {{#serverStreamingMethods}}
 +        serviceDescriptor.addMethod({{methodName}}Method);
 +    {{/serverStreamingMethods}}
 +    {{#clientStreamingMethods}}
 +        serviceDescriptor.addMethod({{methodName}}Method);
 +    {{/clientStreamingMethods}}
 +    {{#biStreamingWithoutClientStreamMethods}}
 +        serviceDescriptor.addMethod({{methodName}}Method);
 +    {{/biStreamingWithoutClientStreamMethods}}
 +    }
 +
 +    public static class {{interfaceClassName}}Stub implements 
{{interfaceClassName}}{
 +
 +        private final Invoker<{{interfaceClassName}}> invoker;
 +
 +        public {{interfaceClassName}}Stub(Invoker<{{interfaceClassName}}> 
invoker) {
 +            this.invoker = invoker;
 +        }
 +
 +    {{#methods}}
 +        {{#javaDoc}}
 +            {{{javaDoc}}}
 +        {{/javaDoc}}
 +        {{#deprecated}}
 +            @java.lang.Deprecated
 +        {{/deprecated}}
 +        public 
{{#isManyOutput}}Flux{{/isManyOutput}}{{^isManyOutput}}Mono{{/isManyOutput}}<{{outputType}}>
 
{{methodName}}({{#isManyInput}}Flux{{/isManyInput}}{{^isManyInput}}Mono{{/isManyInput}}<{{inputType}}>
 request) {
 +            return ReactorClientCalls.{{reactiveCallsMethodName}}(invoker, 
request, {{methodNameCamelCase}}Method);
 +        }
 +    {{/methods}}
 +    }
 +
 +    public static abstract class {{interfaceClassName}}ImplBase implements 
{{interfaceClassName}}, ServerService<{{interfaceClassName}}> {
 +
 +        @Override
 +        public final Invoker<{{interfaceClassName}}> getInvoker(URL url) {
 +            PathResolver pathResolver = url.getOrDefaultFrameworkModel()
 +            .getExtensionLoader(PathResolver.class)
 +            .getDefaultExtension();
 +            Map<String,StubMethodHandler<?, ?>> handlers = new HashMap<>();
 +
 +            {{#methods}}
 +                pathResolver.addNativeStub( "/" + SERVICE_NAME + 
"/{{originMethodName}}");
 +                // for compatibility
 +                pathResolver.addNativeStub( "/" + JAVA_SERVICE_NAME + 
"/{{originMethodName}}");
 +            {{/methods}}
 +
 +            {{#unaryMethods}}
 +                handlers.put({{methodName}}Method.getMethodName(), new 
OneToOneMethodHandler<>(this::{{methodName}}));
 +            {{/unaryMethods}}
 +            {{#serverStreamingMethods}}
 +                handlers.put({{methodName}}Method.getMethodName(), new 
OneToManyMethodHandler<>(this::{{methodName}}));
 +            {{/serverStreamingMethods}}
 +            {{#clientStreamingMethods}}
 +                handlers.put({{methodName}}Method.getMethodName(), new 
ManyToOneMethodHandler<>(this::{{methodName}}));
 +            {{/clientStreamingMethods}}
 +            {{#biStreamingWithoutClientStreamMethods}}
 +                handlers.put({{methodName}}Method.getMethodName(), new 
ManyToManyMethodHandler<>(this::{{methodName}}));
 +            {{/biStreamingWithoutClientStreamMethods}}
 +
 +            return new StubInvoker<>(this, url, {{interfaceClassName}}.class, 
handlers);
 +        }
 +
 +    {{#methods}}
 +        {{#javaDoc}}
 +            {{{javaDoc}}}
 +        {{/javaDoc}}
 +        {{#deprecated}}
 +            @java.lang.Deprecated
 +        {{/deprecated}}
 +        public 
{{#isManyOutput}}Flux{{/isManyOutput}}{{^isManyOutput}}Mono{{/isManyOutput}}<{{outputType}}>
 
{{methodName}}({{#isManyInput}}Flux{{/isManyInput}}{{^isManyInput}}Mono{{/isManyInput}}<{{inputType}}>
 request) {
 +            throw unimplementedMethodException({{methodName}}Method);
 +        }
 +    {{/methods}}
 +
 +        @Override
 +        public final ServiceDescriptor getServiceDescriptor() {
 +            return serviceDescriptor;
 +        }
 +
 +        private RpcException 
unimplementedMethodException(StubMethodDescriptor methodDescriptor) {
 +            return 
TriRpcStatus.UNIMPLEMENTED.withDescription(String.format("Method %s is 
unimplemented",
 +            "/" + serviceDescriptor.getInterfaceName() + "/" + 
methodDescriptor.getMethodName())).asException();
 +        }
 +    }
 +}
diff --cc 
dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java
index 5402cefc30,24218a1b08..ea96e071e8
--- 
a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java
+++ 
b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java
@@@ -61,14 -67,21 +67,21 @@@ public final class ReactorServerCalls 
       * @param responseObserver response StreamObserver
       * @param func service implementation
       */
-     public static <T, R> void oneToMany(
+     public static <T, R> CompletableFuture<List<R>> oneToMany(
              T request, StreamObserver<R> responseObserver, Function<Mono<T>, 
Flux<R>> func) {
          try {
 -            ServerCallToObserverAdapter<R> serverCallToObserverAdapter =
 -                    (ServerCallToObserverAdapter<R>) responseObserver;
++            CallStreamObserver<R> callStreamObserver =
++                    (CallStreamObserver<R>) responseObserver;
              Flux<R> response = func.apply(Mono.just(request));
-             ServerTripleReactorSubscriber<R> subscriber = 
response.subscribeWith(new ServerTripleReactorSubscriber<>());
-             subscriber.subscribe((CallStreamObserver<R>) responseObserver);
+             ServerTripleReactorSubscriber<R> reactorSubscriber =
 -                    new 
ServerTripleReactorSubscriber<>(serverCallToObserverAdapter);
 -            
response.subscribeWith(reactorSubscriber).subscribe(serverCallToObserverAdapter);
++                    new ServerTripleReactorSubscriber<>(callStreamObserver);
++            
response.subscribeWith(reactorSubscriber).subscribe(callStreamObserver);
+             return reactorSubscriber.getExecutionFuture();
          } catch (Throwable throwable) {
-             responseObserver.onError(throwable);
+             doOnResponseHasException(throwable, responseObserver);
+             CompletableFuture<List<R>> future = new CompletableFuture<>();
+             future.completeExceptionally(throwable);
+             return future;
          }
      }
  

Reply via email to