dajac commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380062879


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -108,13 +106,19 @@
 import static org.apache.kafka.common.utils.Utils.propsToMap;
 
 /**
- * This prototype consumer uses an {@link ApplicationEventHandler event 
handler} to process
- * {@link ApplicationEvent application events} so that the network IO can be 
processed in a dedicated
+ * This {@link Consumer} implementation uses an {@link ApplicationEventHandler 
event handler} to process
+ * {@link ApplicationEvent application events} so that the network I/O can be 
processed in a dedicated
  * {@link ConsumerNetworkThread network thread}. Visit
  * <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor";>this
 document</a>
- * for detail implementation.
+ * for implementation detail.
+ *
+ * <p/>
+ *
+ * <em>Note:</em> this {@link Consumer} implementation is part of the revised 
consumer group protocol from KIP-848.
+ * This class should not be invoked directly; users should instead create a 
{@link KafkaConsumer} as before.
+ * This consumer implements the new consumer group protocol and is intended to 
be the default in coming releases.
  */
-public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
+public class AsyncKafkaConsumer<K, V> implements Consumer<K, V> {

Review Comment:
   I wonder if we should make it package private. Thoughts?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -148,30 +152,38 @@ public class PrototypeAsyncConsumer<K, V> implements 
Consumer<K, V> {
     private boolean cachedSubscriptionHasAllFetchPositions;
     private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
 
-    public PrototypeAsyncConsumer(final Properties properties,
-                                  final Deserializer<K> keyDeserializer,
-                                  final Deserializer<V> valueDeserializer) {
+    public AsyncKafkaConsumer(Map<String, Object> configs) {

Review Comment:
   I really wonder if we need all those constructors now that we have the 
delegate creator. For instance, would it be possible to only keep the following 
ones and keep all the conversion to `ConsumerConfig` in the delegate creator?
   ```
   public AsyncKafkaConsumer(final ConsumerConfig config,
                                 final Deserializer<K> keyDeserializer,
                                 final Deserializer<V> valueDeserializer) {
           this(Time.SYSTEM, config, keyDeserializer, valueDeserializer);
       }
   
       public AsyncKafkaConsumer(final Time time,
                                 final ConsumerConfig config,
                                 final Deserializer<K> keyDeserializer,
                                 final Deserializer<V> valueDeserializer) {
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+/**
+ * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow 
the caller to remain unaware of the
+ * underlying {@link Consumer} implementation that is created. This provides 
the means by which {@link KafkaConsumer}
+ * can remain the top-level facade for implementations, but allow different 
implementations to co-exist under
+ * the covers.
+ *
+ * <p/>
+ *
+ * The current logic for the {@code ConsumerCreator} inspects the incoming 
configuration and determines if
+ * it is using the new KIP-848 consumer protocol or if it should fall back to 
the existing, legacy group protocol.
+ * This is based on the presence and value of the {@code group.protocol} 
configuration value. If the value is present
+ * and equals {@code consumer}, the {@link AsyncKafkaConsumer} will be 
returned. Otherwise, the
+ * {@link LegacyKafkaConsumer} will be returned.
+ *
+ * <p/>
+ *
+ * This is not to be called by end users and callers should not attempt to 
determine the underlying implementation
+ * as this will make such code very brittle. Users of this facility should 
honor the top-level {@link Consumer} API
+ * contract as-is.
+ */
+public class ConsumerDelegateCreator {
+
+    /**
+     * This is it! This is the core logic. It's extremely rudimentary.
+     */
+    private static boolean useNewConsumer(Map<?, ?> configs) {
+        Object groupProtocol = configs.get("group.protocol");

Review Comment:
   nit: Could we use `getString`?



##########
tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java:
##########
@@ -281,7 +281,7 @@ private int 
maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map<Object
 
             if (!options.hasDryRun()) {
                 for (final TopicPartition p : partitions) {
-                    client.position(p);
+                    long pos = client.position(p);

Review Comment:
   It is strange that we have to change this now.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+/**
+ * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow 
the caller to remain unaware of the
+ * underlying {@link Consumer} implementation that is created. This provides 
the means by which {@link KafkaConsumer}
+ * can remain the top-level facade for implementations, but allow different 
implementations to co-exist under
+ * the covers.
+ *
+ * <p/>
+ *
+ * The current logic for the {@code ConsumerCreator} inspects the incoming 
configuration and determines if
+ * it is using the new KIP-848 consumer protocol or if it should fall back to 
the existing, legacy group protocol.
+ * This is based on the presence and value of the {@code group.protocol} 
configuration value. If the value is present
+ * and equals {@code consumer}, the {@link AsyncKafkaConsumer} will be 
returned. Otherwise, the
+ * {@link LegacyKafkaConsumer} will be returned.
+ *
+ * <p/>
+ *
+ * This is not to be called by end users and callers should not attempt to 
determine the underlying implementation
+ * as this will make such code very brittle. Users of this facility should 
honor the top-level {@link Consumer} API
+ * contract as-is.
+ */
+public class ConsumerDelegateCreator {
+
+    /**
+     * This is it! This is the core logic. It's extremely rudimentary.
+     */
+    private static boolean useNewConsumer(Map<?, ?> configs) {
+        Object groupProtocol = configs.get("group.protocol");
+
+        // Takes care of both the null and type checks.
+        if (!(groupProtocol instanceof String))
+            return false;
+
+        return ((String) groupProtocol).equalsIgnoreCase("consumer");
+    }
+
+    public <K, V> Consumer<K, V> create(Map<String, Object> configs) {
+        return createInternal(() -> {
+            if (useNewConsumer(configs)) {
+                return new AsyncKafkaConsumer<>(configs);
+            } else {
+                return new LegacyKafkaConsumer<>(configs);
+            }
+        });
+    }
+
+    public <K, V> Consumer<K, V> create(Properties properties) {
+        return createInternal(() -> {
+            if (useNewConsumer(properties)) {
+                return new AsyncKafkaConsumer<>(properties);
+            } else {
+                return new LegacyKafkaConsumer<>(properties);
+            }
+        });
+    }
+
+    public <K, V> Consumer<K, V> create(Properties properties,
+                                        Deserializer<K> keyDeserializer,
+                                        Deserializer<V> valueDeserializer) {
+        return createInternal(() -> {
+            if (useNewConsumer(properties)) {
+                return new AsyncKafkaConsumer<>(properties, keyDeserializer, 
valueDeserializer);
+            } else {
+                return new LegacyKafkaConsumer<>(properties, keyDeserializer, 
valueDeserializer);
+            }
+        });
+    }
+
+    public <K, V> Consumer<K, V> create(Map<String, Object> configs,
+                                        Deserializer<K> keyDeserializer,
+                                        Deserializer<V> valueDeserializer) {
+        return createInternal(() -> {
+            if (useNewConsumer(configs)) {
+                return new AsyncKafkaConsumer<>(configs, keyDeserializer, 
valueDeserializer);
+            } else {
+                return new LegacyKafkaConsumer<>(configs, keyDeserializer, 
valueDeserializer);
+            }
+        });
+    }
+
+    public <K, V> Consumer<K, V> create(ConsumerConfig config,
+                                        Deserializer<K> keyDeserializer,
+                                        Deserializer<V> valueDeserializer) {
+        return createInternal(() -> {
+            if (useNewConsumer(config.values())) {

Review Comment:
   I think that we should simplify the constructors in the delegates. It does 
not make sense to keep all of them.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java:
##########
@@ -211,4 +226,17 @@ else if (t instanceof KafkaException)
             throw new TimeoutException(e);
         }
     }
+
+    public static boolean maybeOverrideEnableAutoCommit(ConsumerConfig config) 
{

Review Comment:
   nit: Not your fault but I find the name of the method a little weird because 
it does not override anything. It just returns whether auto commit should be 
enabled or not.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumerTest.java:
##########
@@ -2582,7 +2581,7 @@ private FetchResponse fetchResponse(TopicPartition 
partition, long fetchOffset,
         return fetchResponse(Collections.singletonMap(partition, fetchInfo));
     }
 
-    private KafkaConsumer<String, String> newConsumer(Time time,
+    private LegacyKafkaConsumer<String, String> newConsumer(Time time,
                                                       KafkaClient client,

Review Comment:
   nit: The indentation of the arguments are not incorrect in a few methods.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+/**
+ * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow 
the caller to remain unaware of the
+ * underlying {@link Consumer} implementation that is created. This provides 
the means by which {@link KafkaConsumer}
+ * can remain the top-level facade for implementations, but allow different 
implementations to co-exist under
+ * the covers.
+ *
+ * <p/>
+ *
+ * The current logic for the {@code ConsumerCreator} inspects the incoming 
configuration and determines if
+ * it is using the new KIP-848 consumer protocol or if it should fall back to 
the existing, legacy group protocol.
+ * This is based on the presence and value of the {@code group.protocol} 
configuration value. If the value is present
+ * and equals {@code consumer}, the {@link AsyncKafkaConsumer} will be 
returned. Otherwise, the
+ * {@link LegacyKafkaConsumer} will be returned.
+ *
+ * <p/>
+ *
+ * This is not to be called by end users and callers should not attempt to 
determine the underlying implementation
+ * as this will make such code very brittle. Users of this facility should 
honor the top-level {@link Consumer} API
+ * contract as-is.
+ */
+public class ConsumerDelegateCreator {
+
+    /**
+     * This is it! This is the core logic. It's extremely rudimentary.
+     */
+    private static boolean useNewConsumer(Map<?, ?> configs) {
+        Object groupProtocol = configs.get("group.protocol");
+
+        // Takes care of both the null and type checks.
+        if (!(groupProtocol instanceof String))
+            return false;
+
+        return ((String) groupProtocol).equalsIgnoreCase("consumer");

Review Comment:
   We have an enum for this. It would be better to reuse it here.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+/**
+ * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow 
the caller to remain unaware of the
+ * underlying {@link Consumer} implementation that is created. This provides 
the means by which {@link KafkaConsumer}
+ * can remain the top-level facade for implementations, but allow different 
implementations to co-exist under
+ * the covers.
+ *
+ * <p/>
+ *
+ * The current logic for the {@code ConsumerCreator} inspects the incoming 
configuration and determines if
+ * it is using the new KIP-848 consumer protocol or if it should fall back to 
the existing, legacy group protocol.
+ * This is based on the presence and value of the {@code group.protocol} 
configuration value. If the value is present
+ * and equals {@code consumer}, the {@link AsyncKafkaConsumer} will be 
returned. Otherwise, the
+ * {@link LegacyKafkaConsumer} will be returned.
+ *
+ * <p/>
+ *
+ * This is not to be called by end users and callers should not attempt to 
determine the underlying implementation
+ * as this will make such code very brittle. Users of this facility should 
honor the top-level {@link Consumer} API
+ * contract as-is.
+ */
+public class ConsumerDelegateCreator {
+
+    /**
+     * This is it! This is the core logic. It's extremely rudimentary.
+     */
+    private static boolean useNewConsumer(Map<?, ?> configs) {
+        Object groupProtocol = configs.get("group.protocol");
+
+        // Takes care of both the null and type checks.
+        if (!(groupProtocol instanceof String))
+            return false;

Review Comment:
   Could it be `null` based on the config definition?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java:
##########
@@ -0,0 +1,2623 @@
+/*

Review Comment:
   For my understanding, this is basically a copy of the old KafkaConsumer 
without any changes, right?



##########
tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java:
##########
@@ -430,8 +430,9 @@ public void testConsume(final long prodTimeMs) throws 
Throwable {
                 () -> log.info("offsetsForTime = {}", offsetsForTime.result));
             // Whether or not offsetsForTimes works, beginningOffsets and 
endOffsets
             // should work.
-            consumer.beginningOffsets(timestampsToSearch.keySet());
-            consumer.endOffsets(timestampsToSearch.keySet());
+            Map<TopicPartition, Long> beginningOffsets = 
consumer.beginningOffsets(timestampsToSearch.keySet());
+            Map<TopicPartition, Long> endingOffsets = 
consumer.endOffsets(timestampsToSearch.keySet());
+            log.trace("beginningOffsets: {}, endingOffsets: {}", 
beginningOffsets, endingOffsets);

Review Comment:
   This is weird.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumerTest.java:
##########
@@ -164,7 +163,7 @@
  * Note to future authors in this class. If you close the consumer, close with 
DURATION.ZERO to reduce the duration of
  * the test.
  */
-public class KafkaConsumerTest {
+public class LegacyKafkaConsumerTest {

Review Comment:
   For my understanding, do we have similar tests for the new consumer? Or do 
we need to run a subset of those for it?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+/**
+ * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow 
the caller to remain unaware of the
+ * underlying {@link Consumer} implementation that is created. This provides 
the means by which {@link KafkaConsumer}
+ * can remain the top-level facade for implementations, but allow different 
implementations to co-exist under
+ * the covers.
+ *
+ * <p/>
+ *
+ * The current logic for the {@code ConsumerCreator} inspects the incoming 
configuration and determines if
+ * it is using the new KIP-848 consumer protocol or if it should fall back to 
the existing, legacy group protocol.
+ * This is based on the presence and value of the {@code group.protocol} 
configuration value. If the value is present
+ * and equals {@code consumer}, the {@link AsyncKafkaConsumer} will be 
returned. Otherwise, the
+ * {@link LegacyKafkaConsumer} will be returned.
+ *
+ * <p/>
+ *
+ * This is not to be called by end users and callers should not attempt to 
determine the underlying implementation
+ * as this will make such code very brittle. Users of this facility should 
honor the top-level {@link Consumer} API
+ * contract as-is.
+ */
+public class ConsumerDelegateCreator {
+
+    /**
+     * This is it! This is the core logic. It's extremely rudimentary.
+     */
+    private static boolean useNewConsumer(Map<?, ?> configs) {

Review Comment:
   I made a similar comment earlier. Would it make sense to create 
`ConsumerConfig`, validate it and determine whether we should use the old or 
the new consumer?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -278,22 +283,22 @@ public PrototypeAsyncConsumer(final Time time,
         }
     }
 
-    public PrototypeAsyncConsumer(LogContext logContext,
-                                  String clientId,
-                                  Deserializers<K, V> deserializers,
-                                  FetchBuffer fetchBuffer,
-                                  FetchCollector<K, V> fetchCollector,
-                                  ConsumerInterceptors<K, V> interceptors,
-                                  Time time,
-                                  ApplicationEventHandler 
applicationEventHandler,
-                                  BlockingQueue<BackgroundEvent> 
backgroundEventQueue,
-                                  Metrics metrics,
-                                  SubscriptionState subscriptions,
-                                  ConsumerMetadata metadata,
-                                  long retryBackoffMs,
-                                  int defaultApiTimeoutMs,
-                                  List<ConsumerPartitionAssignor> assignors,
-                                  String groupId) {
+    public AsyncKafkaConsumer(LogContext logContext,

Review Comment:
   +1 for making it package private.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##########
@@ -321,20 +320,6 @@ public class ConsumerConfig extends AbstractConfig {
      */
     static final String LEAVE_GROUP_ON_CLOSE_CONFIG = 
"internal.leave.group.on.close";
 
-    /**
-     * <code>internal.throw.on.fetch.stable.offset.unsupported</code>
-     * Whether or not the consumer should throw when the new stable offset 
feature is supported.
-     * If set to <code>true</code> then the client shall crash upon hitting it.
-     * The purpose of this flag is to prevent unexpected broker downgrade 
which makes
-     * the offset fetch protection against pending commit invalid. The safest 
approach
-     * is to fail fast to avoid introducing correctness issue.
-     *
-     * <p>
-     * Note: this is an internal configuration and could be changed in the 
future in a backward incompatible way
-     *
-     */
-    static final String THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED = 
"internal.throw.on.fetch.stable.offset.unsupported";

Review Comment:
   I am not fully convinced by moving this one to ConsumerUtils. It seems to me 
that we are putting it a bit "no where".  I also noticed that we have 
`LEAVE_GROUP_ON_CLOSE_CONFIG` which is also an internal one. Do you know why we 
don't have an issue with this one? In the end, I wonder if we should rather 
create a `InternalConsumerConfig` class for those internal configs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to