Re: [PR] [fix][broker] Create new ledger after the current ledger is closed [pulsar]

2024-03-21 Thread via GitHub


codelipenghui merged PR #22034:
URL: https://github.com/apache/pulsar/pull/22034


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated: [fix][broker] Create new ledger after the current ledger is closed (#22034)

2024-03-21 Thread penghui
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new d0ca9835cf9 [fix][broker] Create new ledger after the current ledger 
is closed (#22034)
d0ca9835cf9 is described below

commit d0ca9835cf972ce156bd4a1fc5d109482330857d
Author: Xiangying Meng <55571188+liangyepianz...@users.noreply.github.com>
AuthorDate: Fri Mar 22 11:52:47 2024 +0800

[fix][broker] Create new ledger after the current ledger is closed (#22034)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |   2 +-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  22 ++--
 .../bookkeeper/mledger/impl/ManagedCursorTest.java |  33 --
 .../mledger/impl/ManagedLedgerFactoryTest.java |   2 +-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 111 +++-
 .../mledger/impl/NonDurableCursorTest.java |  17 ++--
 .../mledger/impl/ShadowManagedLedgerImplTest.java  |   5 +-
 .../broker/service/BacklogQuotaManagerTest.java|  13 ++-
 .../broker/service/BrokerBkEnsemblesTests.java |  12 +--
 .../broker/service/BrokerBookieIsolationTest.java  | 112 ++---
 .../broker/service/ConsumedLedgersTrimTest.java|   6 +-
 .../client/impl/ProducerConsumerInternalTest.java  |  44 
 12 files changed, 276 insertions(+), 103 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 9c3598f46ef..8b13fc0f342 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1274,7 +1274,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 if (proposedReadPosition.equals(PositionImpl.EARLIEST)) {
 newReadPosition = ledger.getFirstPosition();
 } else if (proposedReadPosition.equals(PositionImpl.LATEST)) {
-newReadPosition = ledger.getLastPosition().getNext();
+newReadPosition = 
ledger.getNextValidPosition(ledger.getLastPosition());
 } else {
 newReadPosition = proposedReadPosition;
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index c839ee6f77c..1c0a0465507 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1754,10 +1754,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
 maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
 
-if (!pendingAddEntries.isEmpty()) {
-// Need to create a new ledger to write pending entries
-createLedgerAfterClosed();
-}
+createLedgerAfterClosed();
 }
 
 @Override
@@ -1812,7 +1809,6 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 }
 
 ledgerClosed(lh);
-createLedgerAfterClosed();
 }
 }, null);
 }
@@ -2649,7 +2645,16 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 } else {
 PositionImpl slowestReaderPosition = 
cursors.getSlowestReaderPosition();
 if (slowestReaderPosition != null) {
-slowestReaderLedgerId = 
slowestReaderPosition.getLedgerId();
+// The slowest reader position is the mark delete position.
+// If the slowest reader position point the last entry in 
the ledger x,
+// the slowestReaderLedgerId should be x + 1 and the 
ledger x could be deleted.
+LedgerInfo ledgerInfo = 
ledgers.get(slowestReaderPosition.getLedgerId());
+if (ledgerInfo != null && ledgerInfo.getLedgerId() != 
currentLedger.getId()
+&& ledgerInfo.getEntries() == 
slowestReaderPosition.getEntryId() + 1) {
+slowestReaderLedgerId = 
slowestReaderPosition.getLedgerId() + 1;
+} else {
+slowestReaderLedgerId = 
slowestReaderPosition.getLedgerId();
+}
 } else {
 promise.completeExceptionally(new 
ManagedLedgerException("Couldn't find reader position"));
 trimmerMutex.unlock();
@@ -3693,7 +3698,11 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 PositionImpl skippedPosition = 
position.getPositionAfterEntries(skippedEntryNum);
 

[PR] [improve][fn] Support OAuth2 in Go instance [pulsar]

2024-03-21 Thread via GitHub


jiangpengcheng opened a new pull request, #22323:
URL: https://github.com/apache/pulsar/pull/22323

   
   
   
   
   Fixes #22322 
   
   
   
   Main Issue: #xyz
   
   
   
   PIP: #xyz 
   
   
   
   ### Motivation
   
   The Go instance doesn't support OAuth2 authentication yet, it's better to 
support it
   
   ### Modifications
   
   Support the OAuth2 in Go instance
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   - [x] This change is a trivial rework / code cleanup without any test 
coverage.
   
   
   ### Does this pull request potentially affect one of the following parts:
   
   
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [x] `doc-not-needed` 
   - [ ] `doc-complete` 
   
   ### Matching PR in forked repository
   
   PR in forked repository: https://github.com/jiangpengcheng/pulsar/pull/30
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[I] Support OAuth2 authentication in Go instance of pulsar functions [pulsar]

2024-03-21 Thread via GitHub


jiangpengcheng opened a new issue, #22322:
URL: https://github.com/apache/pulsar/issues/22322

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Motivation
   
   the pulsar go client already supports the OAuth2, it's better to support 
OAuth2 in the Go instance too
   
   ### Solution
   
   _No response_
   
   ### Alternatives
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [X] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Fix ResourceGroups loading [pulsar]

2024-03-21 Thread via GitHub


nodece commented on PR #21781:
URL: https://github.com/apache/pulsar/pull/21781#issuecomment-2014266913

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar-client-python) annotated tag v3.5.0-candidate-1 updated (13c8c4e -> a094fdd)

2024-03-21 Thread xyz
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a change to annotated tag v3.5.0-candidate-1
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git


*** WARNING: tag v3.5.0-candidate-1 was modified! ***

from 13c8c4e  (commit)
  to a094fdd  (tag)
 tagging 13c8c4e621737395d6b9648e2bf0150d66eedbe6 (commit)
  by Yunze Xu
  on Fri Mar 22 10:15:15 2024 +0800

- Log -
Release v3.5.0 candidate 1
-BEGIN PGP SIGNATURE-

iQJDBAABCgAtFiEEn+m0+KLf1EiRy6J0Qrtq+2zSb6YFAmX86bMPHHh5ekBhcGFj
aGUub3JnAAoJEEK7avts0m+mSusQAIGcZb6wL1LhQDRfAIcV+1RJRTjWeSXs5+Ha
6cD++IFac3w9+fbKYu9FBzaqG5FzPXWwSbO618OWYPR0BgOrKFYS5+mGWjEZFIaZ
VK7pXvMrY6qHBSPWtXuGvqE9dVzB8anzo2Bm8XeuZbYKSJUPaqpuSmLtP0LgKgH/
hHski8stKFAwPFXl+erPOJKra9LeNTswGysnQQyqtuMkDTLkfLtpmET55VqeyTQc
DTB8gJxUCJxMsMVJ/qNxcjDvOc3vHeH6lHmEE87CPvaza8BboQ8MP9qU8tsueqZB
QYdCxbJyPNjjk+YWfdKziybOmzG/9JneZN8TB1vQJqnnp6RwbuAWpWtvVqJC9cU0
7PKJq9GpBY7cq6sK4rwMddx47lJbM9afQawW2C7TefEzthgQZGqr7IgNUpL0d5H7
uIlvcdLtB46Layt0i5ECnIhCdtOJJFc3uz9c1LneRKAArfefhbr1Hni94MK5HkHy
noPMyFgiTPvnXHQUipzNRsN6uIbP3fHnUKZ9nqZIL/0lz1t459vFOhMpusAm4AgP
anku6oqaZ2NHI9geVDhPJWEsRnEyS/ohB+2FV2QbUt+bIRTXUuXkjSQbiATN74qi
LEJnLSCyQ7mu/7uzsKzrF1gxJJVP4XF0z1dSOIiDgOxJBgwYCqMtKYn4ipMVudLd
mPtNPSVC
=/agE
-END PGP SIGNATURE-
---


No new revisions were added by this update.

Summary of changes:



(pulsar-client-python) branch branch-3.5 created (now 13c8c4e)

2024-03-21 Thread xyz
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a change to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git


  at 13c8c4e  Bump version to 3.5.0

This branch includes the following new commits:

 new 13c8c4e  Bump version to 3.5.0

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.




(pulsar-client-python) 01/01: Bump version to 3.5.0

2024-03-21 Thread xyz
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git

commit 13c8c4e621737395d6b9648e2bf0150d66eedbe6
Author: Yunze Xu 
AuthorDate: Fri Mar 22 10:14:44 2024 +0800

Bump version to 3.5.0
---
 pulsar/__about__.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar/__about__.py b/pulsar/__about__.py
index e891b1b..aca9e7b 100644
--- a/pulsar/__about__.py
+++ b/pulsar/__about__.py
@@ -16,4 +16,4 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-__version__='3.5.0a1'
+__version__='3.5.0'



(pulsar-site) branch main updated: Docs sync done from apache/pulsar (#69c45ad)

2024-03-21 Thread urfree
This is an automated email from the ASF dual-hosted git repository.

urfree pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-site.git


The following commit(s) were added to refs/heads/main by this push:
 new 2c00c5e224e3 Docs sync done from apache/pulsar (#69c45ad)
2c00c5e224e3 is described below

commit 2c00c5e224e383aecaff6dc68e4d81550911c894
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Mar 22 01:32:12 2024 +

Docs sync done from apache/pulsar (#69c45ad)
---
 static/reference/next/pulsar-perf/pulsar-perf.md | 290 +++
 1 file changed, 145 insertions(+), 145 deletions(-)

diff --git a/static/reference/next/pulsar-perf/pulsar-perf.md 
b/static/reference/next/pulsar-perf/pulsar-perf.md
index 004c75bf2421..dd67609564a7 100644
--- a/static/reference/next/pulsar-perf/pulsar-perf.md
+++ b/static/reference/next/pulsar-perf/pulsar-perf.md
@@ -9,62 +9,62 @@ $ pulsar-perf produce [options]
 
 |Flag|Description|Default|
 |---|---|---|
-| `-ch, --chunking` | Should split the message and publish in chunks if 
message size is larger than allowed max size|false|
-| `-threads, --num-test-threads` | Number of test threads|1|
-| `--separator` | Separator between the topic and topic number|-|
-| `-au, --admin-url` | Pulsar Admin URL|null|
-| `-db, --disable-batching` | Disable batching if true|false|
+| `-h, --help` | Print help message|false|
 | `-cf, --conf-file` | Pulsar configuration file|null|
-| `-pn, --producer-name` | Producer Name|null|
-| `-fc, --format-class` | Custom Formatter class 
name|org.apache.pulsar.testclient.DefaultMessageFormatter|
-| `-k, --encryption-key-name` | The public key name to encrypt payload|null|
-| `-nmt, --numMessage-perTransaction` | The number of messages sent by a 
transaction. (After --txn-enable setting to true, -nmt takes effect)|50|
-| `--tls-enable-hostname-verification` | Enable TLS hostname verification|null|
+| `-u, --service-url` | Pulsar Service URL|null|
+| `--auth-plugin` | Authentication plugin class name|null|
+| `--auth-params` | Authentication parameters, whose format is determined by 
the implementation of method `configure` in authentication plugin class, for 
example "key1:val1,key2:val2" or "{"key1":"val1","key2":"val2"}".|null|
+| `--trust-cert-file` | Path for the trusted TLS certificate file||
 | `--tls-allow-insecure` | Allow insecure TLS connection|null|
-| `-bw, --busy-wait` | Enable Busy-Wait on the Pulsar client|false|
+| `--tls-enable-hostname-verification` | Enable TLS hostname verification|null|
+| `-c, --max-connections` | Max number of TCP connections to a single broker|1|
 | `-i, --stats-interval-seconds` | Statistics Interval Seconds. If 0, 
statistics will be disabled|0|
-| `-np, --partitions` | Create partitioned topics with the given number of 
partitions, set 0 to not try to create the topic|null|
-| `-txn, --txn-enable` | Enable or disable the transaction|false|
-| `-fp, --format-payload` | Format %i as a message index in the stream from 
producer and/or %t as the timestamp nanoseconds.|false|
+| `-ioThreads, --num-io-threads` | Set the number of threads to be used for 
handling connections to brokers. The default value is 1.|1|
+| `-bw, --busy-wait` | Enable Busy-Wait on the Pulsar client|false|
+| `--listener-name` | Listener name for the broker.|null|
+| `-lt, --num-listener-threads` | Set the number of threads to be used for 
message listeners|1|
+| `-mlr, --max-lookup-request` | Maximum number of lookup requests allowed on 
each broker connection to prevent overloading a broker|5|
+| `--proxy-url` | Proxy-server URL to which to connect.|null|
 | `--proxy-protocol` | Proxy protocol to select type of routing at proxy.|null|
-| `-dr, --delay-range` | Mark messages with a given delay by a random number 
of seconds. this value between the specified origin (inclusive) and the 
specified bound (exclusive). e.g. 1,300|null|
+| `-ml, --memory-limit` | Configure the Pulsar client memory limit (eg: 32M, 
64M)|0|
 | `-t, --num-topics, --num-topic` | Number of topics.  Must matchthe given 
number of topic arguments.|1|
-| `-v, --encryption-key-value-file` | The file which contains the public key 
to encrypt payload|null|
-| `-bb, --batch-max-bytes` | Maximum number of bytes per batch|4194304|
-| `-u, --service-url` | Pulsar Service URL|null|
+| `-threads, --num-test-threads` | Number of test threads|1|
 | `-r, --rate` | Publish rate msg/s across topics|100|
-| `-ef, --exit-on-failure` | Exit from the process on publish failure 
(default: disable)|false|
+| `-s, --size` | Message size (bytes)|1024|
+| `-n, --num-producers` | Number of producers (per topic)|1|
+| `--separator` | Separator between the topic and topic number|-|
+| `--send-timeout` | Set the sendTimeout value default 0 to keep compatibility 
with previous version of pulsar-perf|0|
+| `-pn, --producer-name` | Producer Name|null|
+| `-au, --admin-url` | Pulsar 

Re: [PR] [improve][cli] PIP-343: Use picocli instead of jcommander in pulsar-perf [pulsar]

2024-03-21 Thread via GitHub


Technoboy- merged PR #22303:
URL: https://github.com/apache/pulsar/pull/22303


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated: [improve][cli] PIP-343: Use picocli instead of jcommander in pulsar-perf (#22303)

2024-03-21 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 69c45ad5300 [improve][cli] PIP-343: Use picocli instead of jcommander 
in pulsar-perf (#22303)
69c45ad5300 is described below

commit 69c45ad5300e36a62a923b8eaa58aab99c6e02fb
Author: crossoverJie 
AuthorDate: Fri Mar 22 09:12:37 2024 +0800

[improve][cli] PIP-343: Use picocli instead of jcommander in pulsar-perf 
(#22303)

Co-authored-by: Zixuan Liu 
---
 .../cli/converters/ByteUnitToLongConverter.java| 39 -
 .../pulsar/cli/converters/ByteConversionTest.java  |  9 +-
 pulsar-testclient/pom.xml  |  4 +-
 .../proxy/socket/client/PerformanceClient.java | 65 +++
 .../apache/pulsar/testclient/BrokerMonitor.java| 30 +++
 .../testclient/CmdGenerateDocumentation.java   | 67 +--
 .../pulsar/testclient/LoadSimulationClient.java| 34 
 .../testclient/LoadSimulationController.java   | 68 +++
 .../pulsar/testclient/ManagedLedgerWriter.java | 57 ++---
 .../testclient/PerformanceBaseArguments.java   | 59 +++--
 .../pulsar/testclient/PerformanceConsumer.java | 65 ---
 .../pulsar/testclient/PerformanceProducer.java | 96 +++---
 .../pulsar/testclient/PerformanceReader.java   | 19 +++--
 .../testclient/PerformanceTopicListArguments.java  | 10 ++-
 .../pulsar/testclient/PerformanceTransaction.java  | 45 +-
 ...or.java => PositiveNumberParameterConvert.java} | 15 ++--
 .../pulsar/testclient/GenerateDocumentionTest.java | 37 +
 17 files changed, 377 insertions(+), 342 deletions(-)

diff --git 
a/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/ByteUnitToLongConverter.java
 
b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/ByteUnitToLongConverter.java
deleted file mode 100644
index 6170fb489d4..000
--- 
a/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/ByteUnitToLongConverter.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.cli.converters;
-
-import static org.apache.pulsar.cli.ValueValidationUtil.emptyCheck;
-import com.beust.jcommander.converters.BaseConverter;
-
-public class ByteUnitToLongConverter extends BaseConverter {
-
-public ByteUnitToLongConverter(String optionName) {
-super(optionName);
-}
-
-@Override
-public Long convert(String argStr) {
-return parseBytes(argStr);
-}
-
-Long parseBytes(String argStr) {
-emptyCheck(getOptionName(), argStr);
-return ByteUnitUtil.validateSizeString(argStr);
-}
-}
diff --git 
a/pulsar-cli-utils/src/test/java/org/apache/pulsar/cli/converters/ByteConversionTest.java
 
b/pulsar-cli-utils/src/test/java/org/apache/pulsar/cli/converters/ByteConversionTest.java
index 283e94bfb9c..6e7a2e6d7e3 100644
--- 
a/pulsar-cli-utils/src/test/java/org/apache/pulsar/cli/converters/ByteConversionTest.java
+++ 
b/pulsar-cli-utils/src/test/java/org/apache/pulsar/cli/converters/ByteConversionTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.cli.converters;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertThrows;
 import org.apache.pulsar.cli.converters.picocli.ByteUnitToIntegerConverter;
+import org.apache.pulsar.cli.converters.picocli.ByteUnitToLongConverter;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 import picocli.CommandLine.TypeConversionException;
@@ -59,8 +60,8 @@ public class ByteConversionTest {
 }
 
 @Test(dataProvider = "successfulByteUnitUtilTestCases")
-public void testSuccessfulByteUnitToLongConverter(String input, long 
expected) {
-ByteUnitToLongConverter converter = new 
ByteUnitToLongConverter("optionName");
+public void testSuccessfulByteUnitToLongConverter(String input, long 
expected) throws Exception{
+ByteUnitToLongConverter converter = new ByteUnitToLongConverter();
 

Re: [PR] [fix] [broker] fix mismatch between dispatcher.consumerList and dispatcher.consumerSet [pulsar]

2024-03-21 Thread via GitHub


codelipenghui commented on code in PR #22283:
URL: https://github.com/apache/pulsar/pull/22283#discussion_r1534887661


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##
@@ -1224,12 +1224,22 @@ protected void handleSubscribe(final CommandSubscribe 
subscribe) {
 commandSender.sendErrorResponse(requestId, 
ServerError.ServiceNotReady,
 "Consumer is already present on the 
connection");
 } else if 
(existingConsumerFuture.isCompletedExceptionally()){
+log.warn("[{}][{}][{}] A failed consumer with id is 
already present on the connection,"
++ " consumerId={}", remoteAddress, topicName, 
subscriptionName, consumerId);
 ServerError error = 
getErrorCodeWithErrorLog(existingConsumerFuture, true,
-String.format("Consumer subscribe failure. 
remoteAddress: %s, subscription: %s",
+String.format("A failed consumer with id is 
already present on the connection."
++ " remoteAddress: %s, 
subscription: %s",
 remoteAddress, subscriptionName));
-consumers.remove(consumerId, existingConsumerFuture);
+/**
+ * This feature may was failed due to the client 
closed a in-progress subscribing.
+ * See {@link 
#handleCloseConsumer(CommandCloseConsumer)}
+ * Do not remove the failed feature at current line, 
it will be removed after the progress of

Review Comment:
   ```suggestion
* Do not remove the future feature at current line, 
it will be removed after the progress of
   ```



##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##
@@ -1224,12 +1224,22 @@ protected void handleSubscribe(final CommandSubscribe 
subscribe) {
 commandSender.sendErrorResponse(requestId, 
ServerError.ServiceNotReady,
 "Consumer is already present on the 
connection");
 } else if 
(existingConsumerFuture.isCompletedExceptionally()){
+log.warn("[{}][{}][{}] A failed consumer with id is 
already present on the connection,"
++ " consumerId={}", remoteAddress, topicName, 
subscriptionName, consumerId);
 ServerError error = 
getErrorCodeWithErrorLog(existingConsumerFuture, true,
-String.format("Consumer subscribe failure. 
remoteAddress: %s, subscription: %s",
+String.format("A failed consumer with id is 
already present on the connection."
++ " remoteAddress: %s, 
subscription: %s",
 remoteAddress, subscriptionName));
-consumers.remove(consumerId, existingConsumerFuture);
+/**
+ * This feature may was failed due to the client 
closed a in-progress subscribing.
+ * See {@link 
#handleCloseConsumer(CommandCloseConsumer)}
+ * Do not remove the failed feature at current line, 
it will be removed after the progress of
+ * the previous subscribing is done.
+ * Before the previous subscribing is done, the new 
subscribe request will always fail.
+ * This mechanism is in order to prevent more complex 
logic to handle the race conditions.
+ */
 commandSender.sendErrorResponse(requestId, error,
-"Consumer that failed is already present on 
the connection");
+"A failed consumer is already present on the 
connection");

Review Comment:
   Hmm, It looks like we don't need to change it.
   
   A failed consumer can be understand as a different consumer
   But if say `Consumer that ...`, it usually means the same consumer



##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##
@@ -1224,12 +1224,22 @@ protected void handleSubscribe(final CommandSubscribe 
subscribe) {
 commandSender.sendErrorResponse(requestId, 
ServerError.ServiceNotReady,
 "Consumer is already present on the 
connection");
 } else if 
(existingConsumerFuture.isCompletedExceptionally()){
+log.warn("[{}][{}][{}] A failed consumer with id is 
already present on the connection,"
++ " consumerId={}", remoteAddress, topicName, 
subscriptionName, consumerId);
 

[PR] [fix][broker] Update TransferShedder underloaded broker check to consider max loaded broker's msgThroughputEMA and update IsExtensibleLoadBalancerImpl check [pulsar]

2024-03-21 Thread via GitHub


heesung-sn opened a new pull request, #22321:
URL: https://github.com/apache/pulsar/pull/22321

   
   
   
   
   
   
   ### Motivation
   
   
   
   Checking zero traffic is not practical to identify if a broker is too 
underloaded or not. There might be some health-checking traffic, which might 
make the broker's traffic slightly bigger than 1 byte/s. We better consider 
some relative measurements to check if a broker is significantly underloaded.
   
   ### Modifications
   
   
   
   - Update TransferShedder underloaded broker check to consider max loaded 
broker's msgThroughputEMA
   - Delete `isLoadManagerExtensionEnabled(ServiceConfiguration conf)` to use 
only `isLoadManagerExtensionEnabled(PulsarService pulsar)` api
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   ### Does this pull request potentially affect one of the following parts:
   
   
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [x] `doc-not-needed` 
   - [ ] `doc-complete` 
   
   ### Matching PR in forked repository
   
   PR in forked repository: 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add support for HasMessageAvailable [pulsar-dotpulsar]

2024-03-21 Thread via GitHub


smbecker commented on PR #207:
URL: https://github.com/apache/pulsar-dotpulsar/pull/207#issuecomment-2013677434

   > In regards to "ValueTask HasMessageAvailable(CancellationToken 
cancellationToken = default);" I'm not sure why that is needed and why it is 
async?
   
   My original intent was to replicate something similar to `EndOfStreamAction` 
from the 
[pulsar-client-reactive](https://github.com/apache/pulsar-client-reactive/blob/b8b42df975c4bb2cd275936402d6439087695554/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReader.java#L65)
 project
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add support for HasMessageAvailable [pulsar-dotpulsar]

2024-03-21 Thread via GitHub


smbecker commented on PR #207:
URL: https://github.com/apache/pulsar-dotpulsar/pull/207#issuecomment-2013672203

   > In regards to "bool HasReachedEndOfTopic();", this is a state and could be 
checked that way.
   
   It currently is not possible to check current state, which was the purpose 
of [this 
change](https://github.com/apache/pulsar-dotpulsar/pull/207/files#diff-dd82a6892bf52f24ca2167d247198fbd63b6126383a09abb28280636ab8303bbR66).
 Also, this allows a consistent method for checking that covers both readers 
and consumers, which seems to line up with the purpose of the `IReceive` 
interface. Should I move the changes to `IStateChanged` to a separate PR? 
Should I remove this method in favor of just allowing checking the current 
state?
   
   > In regards to "ValueTask HasMessageAvailable(CancellationToken 
cancellationToken = default);" I'm not sure why that is needed and why it is 
async? If you want to call "HasMessageAvailable" to make sure that Receive is 
not blocked, then that can not be guaranteed, because HasMessageAvailable might 
return true, but before you get to call Receive the Reader/Consumer can have 
lost the connection or faulted. Are that you really need something like this: 
bool TryReceive(out Message message)?
   
   I was actually looking for something more similar to `TryReceive` but saw 
`hasMessageAvailableAsync` in the Java Client. However, it isn't simply just 
"Is there any messages that are already downloaded?"; It is also answering the 
question of "Is there any messages that could be downloaded from the broker?". 
The Java Client actually makes a call to `GetLastMessageId` to determine if the 
server has any additional messages that could be downloaded. Should I add 
`TryReceive` that only attempts to receive from what has already been 
downloaded? Should I update the implementation to be more inline with the Java 
Client in using `GetLastMessageId`? 
   
   Please advise...
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated: [improve][misc] Include native epoll library for Netty for arm64 (#22319)

2024-03-21 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 24e9437ce06 [improve][misc] Include native epoll library for Netty for 
arm64 (#22319)
24e9437ce06 is described below

commit 24e9437ce065613fd924a74f61b620d9fdc0058b
Author: Lari Hotari 
AuthorDate: Thu Mar 21 13:23:21 2024 -0700

[improve][misc] Include native epoll library for Netty for arm64 (#22319)
---
 distribution/server/src/assemble/LICENSE.bin.txt | 1 +
 distribution/shell/src/assemble/LICENSE.bin.txt  | 1 +
 pulsar-common/pom.xml| 6 ++
 3 files changed, 8 insertions(+)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt 
b/distribution/server/src/assemble/LICENSE.bin.txt
index dac03d966a6..cb99d62edfe 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -309,6 +309,7 @@ The Apache Software License, Version 2.0
 - io.netty-netty-resolver-dns-native-macos-4.1.105.Final-osx-x86_64.jar
 - io.netty-netty-transport-4.1.105.Final.jar
 - io.netty-netty-transport-classes-epoll-4.1.105.Final.jar
+- io.netty-netty-transport-native-epoll-4.1.105.Final-linux-aarch_64.jar
 - io.netty-netty-transport-native-epoll-4.1.105.Final-linux-x86_64.jar
 - io.netty-netty-transport-native-unix-common-4.1.105.Final.jar
 - 
io.netty-netty-transport-native-unix-common-4.1.105.Final-linux-x86_64.jar
diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt 
b/distribution/shell/src/assemble/LICENSE.bin.txt
index 7ae4241dfb9..2b2f1c26be1 100644
--- a/distribution/shell/src/assemble/LICENSE.bin.txt
+++ b/distribution/shell/src/assemble/LICENSE.bin.txt
@@ -360,6 +360,7 @@ The Apache Software License, Version 2.0
 - netty-resolver-dns-4.1.105.Final.jar
 - netty-transport-4.1.105.Final.jar
 - netty-transport-classes-epoll-4.1.105.Final.jar
+- netty-transport-native-epoll-4.1.105.Final-linux-aarch_64.jar
 - netty-transport-native-epoll-4.1.105.Final-linux-x86_64.jar
 - netty-transport-native-unix-common-4.1.105.Final.jar
 - netty-transport-native-unix-common-4.1.105.Final-linux-x86_64.jar
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index 1b54a7aee2d..f93a9ef654a 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -99,6 +99,12 @@
   linux-x86_64
 
 
+
+  io.netty
+  netty-transport-native-epoll
+  linux-aarch_64
+
+
 
   io.netty
   netty-transport-native-unix-common



Re: [PR] [improve][misc] Include native epoll library for Netty for arm64 [pulsar]

2024-03-21 Thread via GitHub


lhotari merged PR #22319:
URL: https://github.com/apache/pulsar/pull/22319


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Fix ResourceGroups loading [pulsar]

2024-03-21 Thread via GitHub


nodece commented on code in PR #21781:
URL: https://github.com/apache/pulsar/pull/21781#discussion_r1534195770


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListener.java:
##
@@ -41,52 +53,73 @@
  * @see https://github.com/apache/pulsar/wiki/PIP-82%3A-Tenant-and-namespace-level-rate-limiting;>Global-quotas
  *
  */
-public class ResourceGroupConfigListener implements Consumer {
+public class ResourceGroupConfigListener implements Consumer, 
AutoCloseable {
 
 private static final Logger LOG = 
LoggerFactory.getLogger(ResourceGroupConfigListener.class);
 private final ResourceGroupService rgService;
 private final PulsarService pulsarService;
 private final ResourceGroupResources rgResources;
-private final ResourceGroupNamespaceConfigListener 
rgNamespaceConfigListener;
+private volatile ResourceGroupNamespaceConfigListener 
rgNamespaceConfigListener;
+private final ScheduledExecutorService executorService;
 
 public ResourceGroupConfigListener(ResourceGroupService rgService, 
PulsarService pulsarService) {
 this.rgService = rgService;
 this.pulsarService = pulsarService;
 this.rgResources = 
pulsarService.getPulsarResources().getResourcegroupResources();
-loadAllResourceGroups();
 this.rgResources.getStore().registerListener(this);
-rgNamespaceConfigListener = new ResourceGroupNamespaceConfigListener(
-rgService, pulsarService, this);
+this.executorService =
+Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("pulsar-resource-group-config"));
+execute(() -> loadAllResourceGroupsWithRetryAsync(0));
 }
 
-private void loadAllResourceGroups() {
-rgResources.listResourceGroupsAsync().whenCompleteAsync((rgList, ex) 
-> {
-if (ex != null) {
-LOG.error("Exception when fetching resource groups", ex);
-return;
+@SneakyThrows
+private void loadAllResourceGroupsWithRetryAsync(long retry) {
+loadAllResourceGroupsAsync().exceptionally(e -> {
+long nextRetry = retry + 1;
+long delay = 500 * nextRetry;
+LOG.error("Failed to load all resource groups during 
initialization, retrying after {}ms", delay);
+schedule(() -> loadAllResourceGroupsWithRetryAsync(nextRetry), 
delay);
+throw new RuntimeException(e);
+}).thenAccept(__ -> {
+if (rgNamespaceConfigListener == null) {
+try{

Review Comment:
   try...catch is unnecessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Fix ResourceGroups loading [pulsar]

2024-03-21 Thread via GitHub


nodece commented on code in PR #21781:
URL: https://github.com/apache/pulsar/pull/21781#discussion_r1534138906


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListener.java:
##
@@ -41,52 +53,73 @@
  * @see https://github.com/apache/pulsar/wiki/PIP-82%3A-Tenant-and-namespace-level-rate-limiting;>Global-quotas
  *
  */
-public class ResourceGroupConfigListener implements Consumer {
+public class ResourceGroupConfigListener implements Consumer, 
AutoCloseable {
 
 private static final Logger LOG = 
LoggerFactory.getLogger(ResourceGroupConfigListener.class);
 private final ResourceGroupService rgService;
 private final PulsarService pulsarService;
 private final ResourceGroupResources rgResources;
-private final ResourceGroupNamespaceConfigListener 
rgNamespaceConfigListener;
+private volatile ResourceGroupNamespaceConfigListener 
rgNamespaceConfigListener;
+private final ScheduledExecutorService executorService;
 
 public ResourceGroupConfigListener(ResourceGroupService rgService, 
PulsarService pulsarService) {
 this.rgService = rgService;
 this.pulsarService = pulsarService;
 this.rgResources = 
pulsarService.getPulsarResources().getResourcegroupResources();
-loadAllResourceGroups();
 this.rgResources.getStore().registerListener(this);
-rgNamespaceConfigListener = new ResourceGroupNamespaceConfigListener(
-rgService, pulsarService, this);
+this.executorService =
+Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("pulsar-resource-group-config"));
+execute(() -> loadAllResourceGroupsWithRetryAsync(0));
 }
 
-private void loadAllResourceGroups() {
-rgResources.listResourceGroupsAsync().whenCompleteAsync((rgList, ex) 
-> {
-if (ex != null) {
-LOG.error("Exception when fetching resource groups", ex);
-return;
+@SneakyThrows
+private void loadAllResourceGroupsWithRetryAsync(long retry) {
+loadAllResourceGroupsAsync().exceptionally(e -> {
+long nextRetry = retry + 1;
+long delay = 500 * nextRetry;
+LOG.error("Failed to load all resource groups during 
initialization, retrying after {}ms", delay);
+schedule(() -> loadAllResourceGroupsWithRetryAsync(nextRetry), 
delay);
+throw new RuntimeException(e);
+}).thenAccept(__ -> {
+if (rgNamespaceConfigListener == null) {
+try{
+rgNamespaceConfigListener = new 
ResourceGroupNamespaceConfigListener(rgService, pulsarService, this);
+}catch (Exception e) {
+e.printStackTrace();
+}
 }
+});
+}
+
+private CompletableFuture loadAllResourceGroupsAsync() {
+return rgResources.listResourceGroupsAsync().thenComposeAsync(rgList 
-> {

Review Comment:
   `executorService` only is used to retry the `loadAllResourceGroupsAsync()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Fix ResourceGroups loading [pulsar]

2024-03-21 Thread via GitHub


Technoboy- commented on code in PR #21781:
URL: https://github.com/apache/pulsar/pull/21781#discussion_r1534024939


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListener.java:
##
@@ -41,52 +53,73 @@
  * @see https://github.com/apache/pulsar/wiki/PIP-82%3A-Tenant-and-namespace-level-rate-limiting;>Global-quotas
  *
  */
-public class ResourceGroupConfigListener implements Consumer {
+public class ResourceGroupConfigListener implements Consumer, 
AutoCloseable {
 
 private static final Logger LOG = 
LoggerFactory.getLogger(ResourceGroupConfigListener.class);
 private final ResourceGroupService rgService;
 private final PulsarService pulsarService;
 private final ResourceGroupResources rgResources;
-private final ResourceGroupNamespaceConfigListener 
rgNamespaceConfigListener;
+private volatile ResourceGroupNamespaceConfigListener 
rgNamespaceConfigListener;
+private final ScheduledExecutorService executorService;
 
 public ResourceGroupConfigListener(ResourceGroupService rgService, 
PulsarService pulsarService) {
 this.rgService = rgService;
 this.pulsarService = pulsarService;
 this.rgResources = 
pulsarService.getPulsarResources().getResourcegroupResources();
-loadAllResourceGroups();
 this.rgResources.getStore().registerListener(this);
-rgNamespaceConfigListener = new ResourceGroupNamespaceConfigListener(
-rgService, pulsarService, this);
+this.executorService =
+Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("pulsar-resource-group-config"));
+execute(() -> loadAllResourceGroupsWithRetryAsync(0));
 }
 
-private void loadAllResourceGroups() {
-rgResources.listResourceGroupsAsync().whenCompleteAsync((rgList, ex) 
-> {
-if (ex != null) {
-LOG.error("Exception when fetching resource groups", ex);
-return;
+@SneakyThrows
+private void loadAllResourceGroupsWithRetryAsync(long retry) {
+loadAllResourceGroupsAsync().exceptionally(e -> {
+long nextRetry = retry + 1;
+long delay = 500 * nextRetry;
+LOG.error("Failed to load all resource groups during 
initialization, retrying after {}ms", delay);
+schedule(() -> loadAllResourceGroupsWithRetryAsync(nextRetry), 
delay);
+throw new RuntimeException(e);
+}).thenAccept(__ -> {
+if (rgNamespaceConfigListener == null) {
+try{
+rgNamespaceConfigListener = new 
ResourceGroupNamespaceConfigListener(rgService, pulsarService, this);
+}catch (Exception e) {
+e.printStackTrace();
+}
 }
+});
+}
+
+private CompletableFuture loadAllResourceGroupsAsync() {
+return rgResources.listResourceGroupsAsync().thenComposeAsync(rgList 
-> {

Review Comment:
   we can use the executor for this async



##
pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListener.java:
##
@@ -41,52 +53,73 @@
  * @see https://github.com/apache/pulsar/wiki/PIP-82%3A-Tenant-and-namespace-level-rate-limiting;>Global-quotas
  *
  */
-public class ResourceGroupConfigListener implements Consumer {
+public class ResourceGroupConfigListener implements Consumer, 
AutoCloseable {
 
 private static final Logger LOG = 
LoggerFactory.getLogger(ResourceGroupConfigListener.class);
 private final ResourceGroupService rgService;
 private final PulsarService pulsarService;
 private final ResourceGroupResources rgResources;
-private final ResourceGroupNamespaceConfigListener 
rgNamespaceConfigListener;
+private volatile ResourceGroupNamespaceConfigListener 
rgNamespaceConfigListener;
+private final ScheduledExecutorService executorService;
 
 public ResourceGroupConfigListener(ResourceGroupService rgService, 
PulsarService pulsarService) {
 this.rgService = rgService;
 this.pulsarService = pulsarService;
 this.rgResources = 
pulsarService.getPulsarResources().getResourcegroupResources();
-loadAllResourceGroups();
 this.rgResources.getStore().registerListener(this);
-rgNamespaceConfigListener = new ResourceGroupNamespaceConfigListener(
-rgService, pulsarService, this);
+this.executorService =
+Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("pulsar-resource-group-config"));
+execute(() -> loadAllResourceGroupsWithRetryAsync(0));
 }
 
-private void loadAllResourceGroups() {
-rgResources.listResourceGroupsAsync().whenCompleteAsync((rgList, ex) 
-> {
-if (ex != null) {
-LOG.error("Exception when fetching resource groups", ex);
-return;
+@SneakyThrows
+private void 

(pulsar-client-go) branch dependabot/go_modules/google.golang.org/protobuf-1.33.0 deleted (was 9023d880)

2024-03-21 Thread github-bot
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a change to branch 
dependabot/go_modules/google.golang.org/protobuf-1.33.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


 was 9023d880 chore(deps): bump google.golang.org/protobuf from 1.30.0 to 
1.33.0

The revisions that were on this branch are still contained in
other references; therefore, this change does not discard any commits
from the repository.



(pulsar-client-go) branch master updated: chore(deps): bump google.golang.org/protobuf from 1.30.0 to 1.33.0 (#1198)

2024-03-21 Thread zike
This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 393f80b4 chore(deps): bump google.golang.org/protobuf from 1.30.0 to 
1.33.0 (#1198)
393f80b4 is described below

commit 393f80b4b93faa36936380b643426026a2b2cd02
Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
AuthorDate: Thu Mar 21 22:24:44 2024 +0800

chore(deps): bump google.golang.org/protobuf from 1.30.0 to 1.33.0 (#1198)

Bumps google.golang.org/protobuf from 1.30.0 to 1.33.0.

---
updated-dependencies:
- dependency-name: google.golang.org/protobuf
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] 
Co-authored-by: dependabot[bot] 
<49699333+dependabot[bot]@users.noreply.github.com>
---
 go.mod |  2 +-
 go.sum | 14 ++
 2 files changed, 3 insertions(+), 13 deletions(-)

diff --git a/go.mod b/go.mod
index 3257d510..51164871 100644
--- a/go.mod
+++ b/go.mod
@@ -27,7 +27,7 @@ require (
golang.org/x/mod v0.8.0
golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
-   google.golang.org/protobuf v1.30.0
+   google.golang.org/protobuf v1.33.0
 )
 
 require (
diff --git a/go.sum b/go.sum
index 50a1ba3e..0a768a0e 100644
--- a/go.sum
+++ b/go.sum
@@ -321,8 +321,6 @@ golang.org/x/mod 
v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB
 golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod 
h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
 golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
 golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
-golang.org/x/mod v0.5.1 h1:OJxoQ/rynoF0dcCdI7cLPktw/hR2cueqYfjm43oqK38=
-golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro=
 golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8=
 golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
 golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod 
h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -357,8 +355,6 @@ golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod 
h1:/O7V0waA8r7cgGh81R
 golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod 
h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
 golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod 
h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
 golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod 
h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
-golang.org/x/net v0.0.0-20220225172249-27dd8689420f 
h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc=
-golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod 
h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
 golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
 golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod 
h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@@ -420,13 +416,9 @@ golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod 
h1:h1NjWce9XRLGQEsW7w
 golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20210819135213-f52c844e1c1c/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a 
h1:ppl5mZgokTT8uPkmYOyEUmPTr3ypaKkg5eFOGrAmxxE=
-golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
 golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod 
h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
-golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 
h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY=
-golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod 
h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
 golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek=
 golang.org/x/term v0.13.0/go.mod 
h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
 golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod 
h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -435,8 +427,6 @@ golang.org/x/text 
v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3
 golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
 golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
-golang.org/x/text v0.3.7 

Re: [PR] chore(deps): bump google.golang.org/protobuf from 1.30.0 to 1.33.0 [pulsar-client-go]

2024-03-21 Thread via GitHub


RobertIndie merged PR #1198:
URL: https://github.com/apache/pulsar-client-go/pull/1198


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [broker] Close dispatchers stuck due to mismatch between dispatcher.consumerList and dispatcher.consumerSet [pulsar]

2024-03-21 Thread via GitHub


poorbarcode commented on PR #22270:
URL: https://github.com/apache/pulsar/pull/22270#issuecomment-2012412125

   > It looks like we don't need this PR if we follow this 
https://github.com/apache/pulsar/pull/22283#pullrequestreview-1951994551 to fix 
the issue. I have tested on my laptop.
   
   Changed the implementation: Just add a defensive code to avoid the topic not 
being unloaded anymore: remove the consumers which are not mismatched between 
`consumerSet` and `consumerList`, and print an error log


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [broker] fix mismatch between dispatcher.consumerList and dispatcher.consumerSet [pulsar]

2024-03-21 Thread via GitHub


poorbarcode commented on code in PR #22283:
URL: https://github.com/apache/pulsar/pull/22283#discussion_r1533976409


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##
@@ -190,9 +190,15 @@ public synchronized CompletableFuture 
addConsumer(Consumer consumer) {
 }
 
 if (isConsumersExceededOnSubscription()) {
-log.warn("[{}] Attempting to add consumer to subscription which 
reached max consumers limit", name);
+log.warn("[{}] Attempting to add consumer to subscription which 
reached max consumers limit {}",
+name, consumer);
 return FutureUtil.failedFuture(new 
ConsumerBusyException("Subscription reached max consumers limit"));
 }
+if (consumerSet.contains(consumer)) {

Review Comment:
   Used a new solution that prevents subsequent subscribing until the previous 
one is done, which guarantees that no race condition about twice subscribing.
   
   > I see the purpose of consumerSet is to make consumerFlow and 
removeConsumer more efficient now. So it won't be an issue.
   
   The new solution fixed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [broker] fix mismatch between dispatcher.consumerList and dispatcher.consumerSet [pulsar]

2024-03-21 Thread via GitHub


poorbarcode commented on PR #22283:
URL: https://github.com/apache/pulsar/pull/22283#issuecomment-2012394478

   @codelipenghui 
   
   > We are using a failed future to ensure the new subscribe request from the 
same consumer should only be accepted once the old subscribe request is done. 
So I think the reasonable fix it to avoid the new subscribe request remove the 
failed future from the old subscribe request.
   > And we already have the mechanism to remove the failed future finally by 
the previous subscribe request.
   
   Good suggestion, already changed the solution: The subsequent subscribing 
will always fail until the previous one is done even if the previous 
subscribing was timed out in the client view, which guarantees that there is no 
race condition about twice subscribing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [broker] fix mismatch between dispatcher.consumerList and dispatcher.consumerSet [pulsar]

2024-03-21 Thread via GitHub


poorbarcode commented on code in PR #22283:
URL: https://github.com/apache/pulsar/pull/22283#discussion_r1533976409


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##
@@ -190,9 +190,15 @@ public synchronized CompletableFuture 
addConsumer(Consumer consumer) {
 }
 
 if (isConsumersExceededOnSubscription()) {
-log.warn("[{}] Attempting to add consumer to subscription which 
reached max consumers limit", name);
+log.warn("[{}] Attempting to add consumer to subscription which 
reached max consumers limit {}",
+name, consumer);
 return FutureUtil.failedFuture(new 
ConsumerBusyException("Subscription reached max consumers limit"));
 }
+if (consumerSet.contains(consumer)) {

Review Comment:
   Used a new solution that prevents subsequent subscribing until the previous 
one is done, which guarantees that no race condition about twice subscribing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[I] Another Multithreading Issues [pulsar-dotpulsar]

2024-03-21 Thread via GitHub


gungod2000 opened a new issue, #210:
URL: https://github.com/apache/pulsar-dotpulsar/issues/210

   hello,
   
   I found anthoer issue,simple code under here.
   
   thanks
   
   

   
   private class ConsumerEntity
   {
   public IConsumer Consumer { get; set; }
   
   public string ThreadId {  get; set; } 
   }
   
   ConcurrentBag ConsumerPool = new 
ConcurrentBag();
   
   private void Form2_Load(object sender, EventArgs e)
   {
   //just a demo 
   //like this,it works normal,all messages will be received, speed is fast
   for (int i = 0; i < 10; i++)
   {
   Task.Factory.StartNew(() =>
   {
   ReceiveMesage(Thread.CurrentThread.ManagedThreadId.ToString());
   });
   }
   
   //[issue here]***
   //but like this ,receive message in task-child task,it works bad,receive 
message is very slowly,many consumer not receive message!
   for (int i = 0; i < 10; i++)
   {
   Task.Factory.StartNew(() =>
   {
   for (int j = 0; j < 10; j++)
   {
   Task.Factory.StartNew(() =>
   {
   //receive message is very slowly,many consumer not 
receive message!I don't understand why???
   
ReceiveMesage(Thread.CurrentThread.ManagedThreadId.ToString());
   });
   }
   });
   }
   }
   
   private IMessage ReceiveMesage(string threadId)
   {
   //demo code
   
   ConsumerEntity consumerEntity = new ConsumerEntity();
   
   //check consumerpool exist consumer by threadId
   //if not exist
   if (!ConsumerPool.Contains(consumerEntity))
   {
   //create new consumer
   consumerEntity.ThreadId = threadId;
   consumerEntity.Consumer = create Consumer();
   ConsumerPool.Add(consumerEntity);
   }
   
   CancellationToken cancellationToken = new CancellationToken();
   
   //use 
   return  
consumerEntity.Consumer.Receive(cancellationToken).GetAwaiter().GetResult();
   }


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Multi-threaded consumption issues [pulsar-dotpulsar]

2024-03-21 Thread via GitHub


gungod2000 commented on issue #208:
URL: 
https://github.com/apache/pulsar-dotpulsar/issues/208#issuecomment-2012021907

   It will be happen IndexOutOfRangeException .
   thanks.
   
   sample code-
   namespace WindowsFormsApp1
   {
   public partial class Form1 : Form
   {
   public Form1()
   {
   InitializeComponent();
   }
   
   private static List> ConsumerPool = new 
List>();
   private void Form1_Load(object sender, EventArgs e)
   {
   // add new consumer into pool
   ConsumerPool.Add(new consumer()...);
   }
   
   private void button1_Click(object sender, EventArgs e)
   {
   
   //start some tasks to receive message
   for (int i = 0; i < 10; i++)
   {
   Task.Factory.StartNew(() =>
   {
   //if add lock in here ,it will be fine ,but too slowly.
   //get random consumer from pool
   var consumer = ConsumerPool.GetSingleConsumer();
   
   //receive message
   var mesage = consumer.receive();
   });
   }
   }
   }
   }
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [improve][fn] Avoid getting stats for a "regex" topic [pulsar]

2024-03-21 Thread via GitHub


jiangpengcheng opened a new pull request, #22320:
URL: https://github.com/apache/pulsar/pull/22320

   
   
   
   
   Fixes #xyz
   
   
   
   Main Issue: #xyz
   
   
   
   PIP: #xyz 
   
   
   
   ### Motivation
   
   While "cleanup" a function, it may try to get the stats for a "regex" topic 
which is useless, and the error logs also doesn't print the `existingConsumers`
   
   ### Modifications
   
   Do not get stats if topic is "regex"
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   - [x] This change is a trivial rework / code cleanup without any test 
coverage.
   
   
   ### Does this pull request potentially affect one of the following parts:
   
   
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [x] `doc-not-needed` 
   - [ ] `doc-complete` 
   
   ### Matching PR in forked repository
   
   PR in forked repository: https://github.com/jiangpengcheng/pulsar/pull/29
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][cli] CmdConsume print publishTime And eventTime info. [pulsar]

2024-03-21 Thread via GitHub


aloyszhang merged PR #22308:
URL: https://github.com/apache/pulsar/pull/22308


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated: [improve][cli] CmdConsume print publishTime And eventTime info. (#22308)

2024-03-21 Thread aloyszhang
This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 74585b5ae07 [improve][cli] CmdConsume print publishTime And eventTime 
info. (#22308)
74585b5ae07 is described below

commit 74585b5ae07a5ab10d85f9a8bc80e3093b7cff20
Author: atomchen <492672...@qq.com>
AuthorDate: Thu Mar 21 18:29:53 2024 +0800

[improve][cli] CmdConsume print publishTime And eventTime info. (#22308)

Co-authored-by: atomchchen 
---
 .../main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java | 3 +++
 .../org/apache/pulsar/tests/integration/cli/ClientToolTest.java| 7 ---
 2 files changed, 7 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java
index a7932c732eb..658b34767b5 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java
@@ -108,6 +108,9 @@ public abstract class AbstractCmdConsume extends 
AbstractCmd {
 data = value.toString();
 }
 
+sb.append("publishTime:[").append(message.getPublishTime()).append("], 
");
+sb.append("eventTime:[").append(message.getEventTime()).append("], ");
+
 String key = null;
 if (message.hasKey()) {
 key = message.getKey();
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClientToolTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClientToolTest.java
index 571948443b1..0d6b6f1abe4 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClientToolTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClientToolTest.java
@@ -86,13 +86,14 @@ public class ClientToolTest extends TopicMessagingBase {
 + "\nError output:\n" + result.getStderr());
 }
 String output = result.getStdout();
-Pattern message = Pattern.compile("- got message 
-\nkey:\\[null\\], properties:\\[\\], content:(.*)");
+Pattern message = Pattern.compile(
+"- got message -\npublishTime:\\[(.*)\\], 
eventTime:\\[(.*)\\], key:\\[null\\], "
++ "properties:\\[\\], content:(.*)");
 Matcher matcher = message.matcher(output);
 List received = new ArrayList<>(MESSAGE_COUNT);
 while (matcher.find()) {
-received.add(matcher.group(1));
+received.add(matcher.group(3));
 }
 return received;
 }
-
 }



Re: [PR] [fix] [broker] Part-1: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]

2024-03-21 Thread via GitHub


codelipenghui commented on code in PR #21946:
URL: https://github.com/apache/pulsar/pull/21946#discussion_r1533581014


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java:
##
@@ -188,58 +274,134 @@ protected CompletableFuture 
isLocalTopicActive() {
 }, brokerService.executor());
 }
 
-protected synchronized CompletableFuture closeProducerAsync() {
-if (producer == null) {
-STATE_UPDATER.set(this, State.Stopped);
+/**
+ * @Deprecated This method only be used by {@link PersistentTopic#checkGC} 
now.
+ * TODO "PersistentReplicator.replicateEntries" may get a 
NullPointerException if this method and a
+ *   "cursor.readComplete" execute concurrently.
+ */
+@Deprecated

Review Comment:
   I think we can left a TODO there to refactor the code.



##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java:
##
@@ -188,58 +274,134 @@ protected CompletableFuture 
isLocalTopicActive() {
 }, brokerService.executor());
 }
 
-protected synchronized CompletableFuture closeProducerAsync() {
-if (producer == null) {
-STATE_UPDATER.set(this, State.Stopped);
+/**
+ * @Deprecated This method only be used by {@link PersistentTopic#checkGC} 
now.
+ * TODO "PersistentReplicator.replicateEntries" may get a 
NullPointerException if this method and a
+ *   "cursor.readComplete" execute concurrently.
+ */
+@Deprecated

Review Comment:
   If it still be used in PersistentTopic#checkGC, I don't think we should 
deprecate this method. Do we have any other alternative for 
`PersistentTopic#checkGC`?



##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java:
##
@@ -64,10 +70,35 @@ public abstract class AbstractReplicator {
 
 protected static final AtomicReferenceFieldUpdater STATE_UPDATER =
 AtomicReferenceFieldUpdater.newUpdater(AbstractReplicator.class, 
State.class, "state");
-private volatile State state = State.Stopped;
-
-protected enum State {
-Stopped, Starting, Started, Stopping
+@VisibleForTesting
+@Getter
+protected volatile State state = State.Stopped;
+
+public enum State {
+/**
+ * This enum has two mean meanings:Init, Stopped.
+ * Regarding the meaning "Stopped", only {@link 
PersistentTopic#checkGC} will call {@link #disconnect},
+ *   so this method only be used by {@link PersistentTopic#checkGC} 
now.
+ * TODO After improving the method {@link #disconnect)}, we should 
rename "Stopped" to "Init".
+ */
+// The internal producer is stopped.
+Stopped,
+// Trying to create a new internal producer.
+Starting,
+// The internal producer has started, and tries copy data.
+Started,
+/**
+ * @Deprecated Only {@link PersistentTopic#checkGC} will call {@link 
#disconnect}, so this method only be
+ *  used by {@link PersistentTopic#checkGC} now.
+ * TODO After improving the method {@link #disconnect)}, this enum 
should be removed.
+ */
+@Deprecated
+// The internal producer is trying to stop.
+Stopping,

Review Comment:
   We still use `Stopping` when disconnecting the replicator. Why do we need 
add `@Deprecated` annotation?
   
   BTW, is it better to change `Stopping`, `Stopped` to `Disconnecting`, 
`Disconnected`? Honestly, It's not easy to understand what is the difference 
between stop, terminate. But disconnecting will be more better in this case.



##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java:
##
@@ -188,58 +274,134 @@ protected CompletableFuture 
isLocalTopicActive() {
 }, brokerService.executor());
 }
 
-protected synchronized CompletableFuture closeProducerAsync() {
-if (producer == null) {
-STATE_UPDATER.set(this, State.Stopped);
+/**
+ * @Deprecated This method only be used by {@link PersistentTopic#checkGC} 
now.
+ * TODO "PersistentReplicator.replicateEntries" may get a 
NullPointerException if this method and a
+ *   "cursor.readComplete" execute concurrently.
+ */
+@Deprecated
+public CompletableFuture disconnect(boolean failIfHasBacklog, 
boolean closeTheStartingProducer) {
+long backlog = getNumberOfEntriesInBacklog();
+if (failIfHasBacklog && backlog > 0) {
+CompletableFuture disconnectFuture = new 
CompletableFuture<>();
+disconnectFuture.completeExceptionally(new 
TopicBusyException("Cannot close a replicator with backlog"));
+if (log.isDebugEnabled()) {
+log.debug("[{}] Replicator disconnect failed since topic has 
backlog", replicatorId);
+}
+return disconnectFuture;
+}
+log.info("[{}] Disconnect replicator at 

(pulsar-site) branch main updated: Adding Pulsar Helm Chart 3.3.1 to index.yaml

2024-03-21 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-site.git


The following commit(s) were added to refs/heads/main by this push:
 new 416c9a8e7711 Adding Pulsar Helm Chart 3.3.1 to index.yaml
416c9a8e7711 is described below

commit 416c9a8e771189870a4cf7eb6dd81429fb33b269
Author: Lari Hotari 
AuthorDate: Thu Mar 21 11:31:07 2024 +0200

Adding Pulsar Helm Chart 3.3.1 to index.yaml
---
 static/charts/index.yaml | 25 -
 1 file changed, 24 insertions(+), 1 deletion(-)

diff --git a/static/charts/index.yaml b/static/charts/index.yaml
index 99fe0fd870e8..6c5b32b9b7d2 100644
--- a/static/charts/index.yaml
+++ b/static/charts/index.yaml
@@ -18,6 +18,29 @@
 apiVersion: v1
 entries:
   pulsar:
+  - apiVersion: v2
+appVersion: 3.0.3
+created: "2024-03-21T11:30:52.258959+02:00"
+dependencies:
+- condition: kube-prometheus-stack.enabled
+  name: kube-prometheus-stack
+  repository: https://prometheus-community.github.io/helm-charts
+  version: 56.x.x
+description: Apache Pulsar Helm chart for Kubernetes
+digest: 01e1822e10811352b2ed25e0b2d6a9d9a90af8efc5d32c82feb9301fab21fc83
+home: https://pulsar.apache.org
+icon: https://pulsar.apache.org/img/pulsar.svg
+kubeVersion: '>=1.21.0-0'
+maintainers:
+- email: d...@pulsar.apache.org
+  name: The Apache Pulsar Team
+name: pulsar
+sources:
+- https://github.com/apache/pulsar
+- https://github.com/apache/pulsar-helm-chart
+urls:
+- https://downloads.apache.org/pulsar/helm-chart/3.3.1/pulsar-3.3.1.tgz
+version: 3.3.1
   - apiVersion: v2
 appVersion: 3.0.2
 created: "2024-02-27T16:00:17.329495+02:00"
@@ -523,4 +546,4 @@ entries:
 urls:
 - 
https://github.com/apache/pulsar-helm-chart/releases/download/pulsar-2.6.0-1/pulsar-2.6.0-1.tgz
 version: 2.6.0-1
-generated: "2024-02-27T16:00:17.312053+02:00"
+generated: "2024-03-21T11:30:52.241885+02:00"



(pulsar) branch master updated: [fix][client]Fixed getting an incorrect `maxMessageSize` value when accessing multiple clusters in the same process (#22306)

2024-03-21 Thread aloyszhang
This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 71598c11637 [fix][client]Fixed getting an incorrect `maxMessageSize` 
value when accessing multiple clusters in the same process (#22306)
71598c11637 is described below

commit 71598c1163730defb9fdea85e813fe863c3fe4d2
Author: atomchen <492672...@qq.com>
AuthorDate: Thu Mar 21 17:30:40 2024 +0800

[fix][client]Fixed getting an incorrect `maxMessageSize` value when 
accessing multiple clusters in the same process (#22306)

Co-authored-by: atomchchen 
---
 .../client/api/SimpleProducerConsumerTest.java |  6 ++--
 .../client/impl/ProducerMemoryLimitTest.java   | 12 
 .../pulsar/client/impl/ProducerSemaphoreTest.java  | 18 ++--
 .../client/impl/AbstractBatchMessageContainer.java |  9 --
 .../client/impl/BatchMessageContainerImpl.java | 10 +++
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  3 +-
 .../pulsar/client/impl/ConnectionHandler.java  |  7 +
 .../apache/pulsar/client/impl/ConsumerImpl.java|  3 +-
 .../apache/pulsar/client/impl/ProducerImpl.java| 32 ++
 9 files changed, 60 insertions(+), 40 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 4536bda907b..4c106d39e7a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -93,13 +93,13 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
-import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.ConsumerBase;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.PartitionedProducerImpl;
+import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.TopicMessageImpl;
 import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
 import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
@@ -3906,11 +3906,11 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
 .topic("persistent://my-property/my-ns/my-topic2");
 
 @Cleanup
-Producer producer = producerBuilder.create();
+ProducerImpl producer = 
(ProducerImpl)producerBuilder.create();
 List> futures = new ArrayList<>();
 
 // Asynchronously produce messages
-byte[] message = new byte[ClientCnx.getMaxMessageSize() + 1];
+byte[] message = new 
byte[producer.getConnectionHandler().getMaxMessageSize() + 1];
 for (int i = 0; i < maxPendingMessages + 10; i++) {
 Future future = producer.sendAsync(message);
 try {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
index d776fdb0ed9..55a67ae644d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import io.netty.buffer.ByteBufAllocator;
 import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
@@ -33,7 +34,6 @@ import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SizeUnit;
-import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -69,10 +69,12 @@ public class ProducerMemoryLimitTest extends 
ProducerConsumerBase {
 .create();
 this.stopBroker();
 try {
-try (MockedStatic mockedStatic = 
Mockito.mockStatic(ClientCnx.class)) {
-mockedStatic.when(ClientCnx::getMaxMessageSize).thenReturn(8);
-producer.send("memory-test".getBytes(StandardCharsets.UTF_8));
-}
+ConnectionHandler connectionHandler = 

Re: [PR] [fix][client]Fixed getting an incorrect `maxMessageSize` value when accessing multiple clusters in the same process [pulsar]

2024-03-21 Thread via GitHub


aloyszhang merged PR #22306:
URL: https://github.com/apache/pulsar/pull/22306


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][cli] CmdConsume print publishTime And eventTime info. [pulsar]

2024-03-21 Thread via GitHub


chenhongSZ commented on PR #22308:
URL: https://github.com/apache/pulsar/pull/22308#issuecomment-2011734030

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar-helm-chart) annotated tag pulsar-3.3.1 updated (43ed6f5 -> ac187ad)

2024-03-21 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to annotated tag pulsar-3.3.1
in repository https://gitbox.apache.org/repos/asf/pulsar-helm-chart.git


*** WARNING: tag pulsar-3.3.1 was modified! ***

from 43ed6f5  (commit)
  to ac187ad  (tag)
 tagging 43ed6f543425059509b0e8593f85811ece179f75 (commit)
 replaces pulsar-3.3.0
  by Lari Hotari
  on Thu Mar 21 11:28:07 2024 +0200

- Log -
Apache Pulsar Helm Chart 3.3.1
-BEGIN PGP SIGNATURE-

iQIzBAABCgAdFiEExmPS9k2iygnbKNmr0/pn1SLFUlYFAmX7/agACgkQ0/pn1SLF
UlZ5YBAAtAjZRhTHtwhSj/IBl8ylyAMP03Ex3Cinvdg+WbwiItd0UNsKq0N3LW3b
58IXpq11kgxN4OkNLrhyFRo1zXZQE2IPkf/Gg0Wm/o216sDnG69lFG3OAxh2Fdf3
8gdIEedk20BiZKLNuqwZxmkjrlCiyeSWU4KctRkrlliISfxyWJfHKNRaFSWeVDma
KPBWi1v0hedJiLveuuccd1yeOkFKsUnzBsDhd8RiGQS3qq2xI83skhrm1W3KAHHN
93sDmmm8fSqWUAdoPS450vw3GW1YqcuTixeBO4x8NhS+1M25WvmNGJRSoOWFN6V+
fjEowd6qvvi5CD7S6JmuV7tT9o7q30xx/FbYh2eA5FL1L4S+vqQyhf4L28ehD555
TZdJbRaZmLie2ISBtnNgkg0kcD8accKzmkWt6OSIq41ML4G777KSemhBqT4AjYb1
oi+apuGQy3o9+DD3NmYq+kLGoqInFqKwocAwCSVkOEC/y3NTcvo1Y2sWE6alv2Kz
ltJbEA3Tf1nNOKAOGeZ1ec5HJ535DHfdhHbE9yACS38iJ3scW17edZYGZ8exh/ur
J93C2BfcTuPrSRHK+TFX/FUtwx7hQY1Pf1dHY8SNg+CzgEpxBs1zpuq1PBPpC55q
vI4RRTC1vIJ9Uxuvwtv2/C3pORI+oeiwt4SqgWMYd/x2qGrW8kw=
=7gg2
-END PGP SIGNATURE-
---


No new revisions were added by this update.

Summary of changes:



svn commit: r68036 - /dev/pulsar/helm-chart/3.3.1-candidate-1/ /release/pulsar/helm-chart/3.3.1/

2024-03-21 Thread lhotari
Author: lhotari
Date: Thu Mar 21 09:27:55 2024
New Revision: 68036

Log:
Release Pulsar Helm Chart 3.3.1 from 3.3.1-candidate-1

Added:
release/pulsar/helm-chart/3.3.1/
  - copied from r68035, dev/pulsar/helm-chart/3.3.1-candidate-1/
Removed:
dev/pulsar/helm-chart/3.3.1-candidate-1/



svn commit: r68035 - /dev/pulsar/helm-chart/3.3.1-candidate-1/index.yaml

2024-03-21 Thread lhotari
Author: lhotari
Date: Thu Mar 21 09:27:42 2024
New Revision: 68035

Log:
Remove temporary index.yaml file

Removed:
dev/pulsar/helm-chart/3.3.1-candidate-1/index.yaml



[I] [Bug] Broker startup could get into a long crash loop when Functions are enabled [pulsar-helm-chart]

2024-03-21 Thread via GitHub


lhotari opened a new issue, #473:
URL: https://github.com/apache/pulsar-helm-chart/issues/473

   **Describe the bug**
   
   on pulsar-broker-0, this is the last entry from the logs
   ```
   Caused by: java.net.UnknownHostException: 
pulsar-broker-2.pulsar-broker.default.svc.cluster.local: Name or service not 
known
   at java.net.Inet4AddressImpl.lookupAllHostAddr(Native Method) ~[?:?]
   at 
java.net.InetAddress$PlatformNameService.lookupAllHostAddr(InetAddress.java:934)
 ~[?:?]
   at 
java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1543) ~[?:?]
   at java.net.InetAddress$NameServiceAddresses.get(InetAddress.java:852) 
~[?:?]
   at java.net.InetAddress.getAllByName0(InetAddress.java:1533) ~[?:?]
   at java.net.InetAddress.getAllByName(InetAddress.java:1385) ~[?:?]
   at java.net.InetAddress.getAllByName(InetAddress.java:1306) ~[?:?]
   at io.netty.util.internal.SocketUtils$9.run(SocketUtils.java:169) 
~[io.netty-netty-common-4.1.100.Final.jar:4.1.100.Final]
   at io.netty.util.internal.SocketUtils$9.run(SocketUtils.java:166) 
~[io.netty-netty-common-4.1.100.Final.jar:4.1.100.Final]
   at 
java.security.AccessController.doPrivileged(AccessController.java:569) ~[?:?]
   at 
io.netty.util.internal.SocketUtils.allAddressesByName(SocketUtils.java:166) 
~[io.netty-netty-common-4.1.100.Final.jar:4.1.100.Final]
   at 
io.netty.resolver.DefaultNameResolver.doResolveAll(DefaultNameResolver.java:50) 
~[io.netty-netty-resolver-4.1.100.Final.jar:4.1.100.Final]
   at 
io.netty.resolver.SimpleNameResolver.resolveAll(SimpleNameResolver.java:79) 
~[io.netty-netty-resolver-4.1.100.Final.jar:4.1.100.Final]
   at 
io.netty.resolver.SimpleNameResolver.resolveAll(SimpleNameResolver.java:71) 
~[io.netty-netty-resolver-4.1.100.Final.jar:4.1.100.Final]
   at 
org.asynchttpclient.resolver.RequestHostnameResolver.resolve(RequestHostnameResolver.java:50)
 ~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
   at 
org.asynchttpclient.netty.request.NettyRequestSender.resolveAddresses(NettyRequestSender.java:357)
 ~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
   at 
org.asynchttpclient.netty.request.NettyRequestSender.sendRequestWithNewChannel(NettyRequestSender.java:300)
 ~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
   at 
org.asynchttpclient.netty.request.NettyRequestSender.sendRequestWithCertainForceConnect(NettyRequestSender.java:142)
 ~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
   at 
org.asynchttpclient.netty.request.NettyRequestSender.sendRequest(NettyRequestSender.java:113)
 ~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
   at 
org.asynchttpclient.netty.request.NettyRequestSender.sendNextRequest(NettyRequestSender.java:548)
 ~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
   at 
org.asynchttpclient.netty.handler.intercept.Redirect30xInterceptor.exitAfterHandlingRedirect(Redirect30xInterceptor.java:160)
 ~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
   at 
org.asynchttpclient.netty.handler.intercept.Interceptors.exitAfterIntercept(Interceptors.java:98)
 ~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
   at 
org.asynchttpclient.netty.handler.HttpHandler.handleHttpResponse(HttpHandler.java:78)
 ~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
   at 
org.asynchttpclient.netty.handler.HttpHandler.handleRead(HttpHandler.java:140) 
~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
   at 
org.asynchttpclient.netty.handler.AsyncHttpClientHandler.channelRead(AsyncHttpClientHandler.java:78)
 ~[org.asynchttpclient-async-http-client-2.12.1.jar:?]
   at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 ~[io.netty-netty-transport-4.1.100.Final.jar:4.1.100.Final]
   at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[io.netty-netty-transport-4.1.100.Final.jar:4.1.100.Final]
   at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[io.netty-netty-transport-4.1.100.Final.jar:4.1.100.Final]
   at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
 ~[io.netty-netty-codec-4.1.100.Final.jar:4.1.100.Final]
   at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 ~[io.netty-netty-transport-4.1.100.Final.jar:4.1.100.Final]
   at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[io.netty-netty-transport-4.1.100.Final.jar:4.1.100.Final]
   at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[io.netty-netty-transport-4.1.100.Final.jar:4.1.100.Final]
   at 

[PR] [improve][misc] Include native epoll library for Netty for arm64 [pulsar]

2024-03-21 Thread via GitHub


lhotari opened a new pull request, #22319:
URL: https://github.com/apache/pulsar/pull/22319

   ### Motivation
   
   When running Pulsar in Docker on MacOS Apple Silicon M3, I get this error 
message:
   ```
   Caused by: java.io.FileNotFoundException: 
META-INF/native/libnetty_transport_native_epoll_aarch_64.so
   at 
io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:186) 
~[io.netty-netty-common-4.1.100.Final.jar:4.1.100.Final]
   at io.netty.channel.epoll.Native.loadNativeLibrary(Native.java:323) 
~[io.netty-netty-transport-classes-epoll-4.1.100.Final.jar:4.1.100.Final]
   at io.netty.channel.epoll.Native.(Native.java:85) 
~[io.netty-netty-transport-classes-epoll-4.1.100.Final.jar:4.1.100.Final]
   at io.netty.channel.epoll.Epoll.(Epoll.java:40) 
~[io.netty-netty-transport-classes-epoll-4.1.100.Final.jar:4.1.100.Final]
   at 
org.apache.pulsar.common.util.netty.EventLoopUtil.newEventLoopGroup(EventLoopUtil.java:59)
 ~[org.apache.pulsar-pulsar-common-3.0.3.jar:3.0.3]
   at org.apache.pulsar.broker.PulsarService.(PulsarService.java:337) 
~[org.apache.pulsar-pulsar-broker-3.0.3.jar:3.0.3]
   at 
org.apache.pulsar.PulsarBrokerStarter$BrokerStarter.(PulsarBrokerStarter.java:204)
 ~[org.apache.pulsar-pulsar-broker-3.0.3.jar:3.0.3]
   at 
org.apache.pulsar.PulsarBrokerStarter.main(PulsarBrokerStarter.java:333) 
~[org.apache.pulsar-pulsar-broker-3.0.3.jar:3.0.3]
   Suppressed: java.lang.UnsatisfiedLinkError: no 
netty_transport_native_epoll_aarch_64 in java.library.path: 
/usr/java/packages/lib:/usr/lib64:/lib64:/lib:/usr/lib
   at java.lang.ClassLoader.loadLibrary(ClassLoader.java:2434) ~[?:?]
   at java.lang.Runtime.loadLibrary0(Runtime.java:818) ~[?:?]
   at java.lang.System.loadLibrary(System.java:1993) ~[?:?]
   at 
io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38) 
~[io.netty-netty-common-4.1.100.Final.jar:4.1.100.Final]
   at 
io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:396)
 ~[io.netty-netty-common-4.1.100.Final.jar:4.1.100.Final]
   at 
io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:161) 
~[io.netty-netty-common-4.1.100.Final.jar:4.1.100.Final]
   at io.netty.channel.epoll.Native.loadNativeLibrary(Native.java:323) 
~[io.netty-netty-transport-classes-epoll-4.1.100.Final.jar:4.1.100.Final]
   at io.netty.channel.epoll.Native.(Native.java:85) 
~[io.netty-netty-transport-classes-epoll-4.1.100.Final.jar:4.1.100.Final]
   at io.netty.channel.epoll.Epoll.(Epoll.java:40) 
~[io.netty-netty-transport-classes-epoll-4.1.100.Final.jar:4.1.100.Final]
   at 
org.apache.pulsar.common.util.netty.EventLoopUtil.newEventLoopGroup(EventLoopUtil.java:59)
 ~[org.apache.pulsar-pulsar-common-3.0.3.jar:3.0.3]
   at 
org.apache.pulsar.broker.PulsarService.(PulsarService.java:337) 
~[org.apache.pulsar-pulsar-broker-3.0.3.jar:3.0.3]
   at 
org.apache.pulsar.PulsarBrokerStarter$BrokerStarter.(PulsarBrokerStarter.java:204)
 ~[org.apache.pulsar-pulsar-broker-3.0.3.jar:3.0.3]
   at 
org.apache.pulsar.PulsarBrokerStarter.main(PulsarBrokerStarter.java:333) 
~[org.apache.pulsar-pulsar-broker-3.0.3.jar:3.0.3]
   Suppressed: java.lang.UnsatisfiedLinkError: no 
netty_transport_native_epoll_aarch_64 in java.library.path: 
/usr/java/packages/lib:/usr/lib64:/lib64:/lib:/usr/lib
   at java.lang.ClassLoader.loadLibrary(ClassLoader.java:2434) 
~[?:?]
   at java.lang.Runtime.loadLibrary0(Runtime.java:818) ~[?:?]
   at java.lang.System.loadLibrary(System.java:1993) ~[?:?]
   at 
io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38) 
~[io.netty-netty-common-4.1.100.Final.jar:4.1.100.Final]
   at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method) ~[?:?]
   at 
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
 ~[?:?]
   at 
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:?]
   at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
   at 
io.netty.util.internal.NativeLibraryLoader$1.run(NativeLibraryLoader.java:430) 
~[io.netty-netty-common-4.1.100.Final.jar:4.1.100.Final]
   at 
java.security.AccessController.doPrivileged(AccessController.java:318) ~[?:?]
   at 
io.netty.util.internal.NativeLibraryLoader.loadLibraryByHelper(NativeLibraryLoader.java:422)
 ~[io.netty-netty-common-4.1.100.Final.jar:4.1.100.Final]
   at 
io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:388)
 ~[io.netty-netty-common-4.1.100.Final.jar:4.1.100.Final]
   at 
io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:161) 
~[io.netty-netty-common-4.1.100.Final.jar:4.1.100.Final]
   

Re: [PR] Add documents for the batching arguments when creating producer [pulsar-client-python]

2024-03-21 Thread via GitHub


merlimat merged PR #205:
URL: https://github.com/apache/pulsar-client-python/pull/205


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar-client-python) branch main updated: Add documents for the batching arguments when creating producer (#205)

2024-03-21 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git


The following commit(s) were added to refs/heads/main by this push:
 new 2a8819d  Add documents for the batching arguments when creating 
producer (#205)
2a8819d is described below

commit 2a8819def9a2b5eebdd6c2260ff3cbad6f0b1ef1
Author: Yunze Xu 
AuthorDate: Thu Mar 21 17:00:05 2024 +0800

Add documents for the batching arguments when creating producer (#205)
---
 pulsar/__init__.py | 14 ++
 1 file changed, 14 insertions(+)

diff --git a/pulsar/__init__.py b/pulsar/__init__.py
index a46c209..9590fa3 100644
--- a/pulsar/__init__.py
+++ b/pulsar/__init__.py
@@ -670,6 +670,20 @@ class Client:
 
 SNAPPY is supported since Pulsar 2.4. Consumers will need to be at 
least at that release in order to
 be able to receive messages compressed with SNAPPY.
+batching_enabled: bool, default=False
+When automatic batching is enabled, multiple calls to `send` can 
result in a single batch to be sent to the
+broker, leading to better throughput, especially when publishing 
small messages.
+All messages in a batch will be published as a single batched 
message. The consumer will be delivered
+individual messages in the batch in the same order they were 
enqueued.
+batching_max_messages: int, default=1000
+When you set this option to a value greater than 1, messages are 
queued until this threshold or
+`batching_max_allowed_size_in_bytes` is reached or batch interval 
has elapsed.
+batching_max_allowed_size_in_bytes: int, default=128*1024
+When you set this option to a value greater than 1, messages are 
queued until this threshold or
+`batching_max_messages` is reached or batch interval has elapsed.
+batching_max_publish_delay_ms: int, default=10
+The batch interval in milliseconds. Queued messages will be sent 
in batch after this interval even if both
+the threshold of `batching_max_messages` and 
`batching_max_allowed_size_in_bytes` are not reached.
 max_pending_messages: int, default=1000
 Set the max size of the queue holding the messages pending to 
receive an acknowledgment from the broker.
 max_pending_messages_across_partitions: int, default=5



Re: [I] Document for the batching arguments when creating producer [pulsar-client-python]

2024-03-21 Thread via GitHub


merlimat closed issue #187: Document for the batching arguments when creating 
producer
URL: https://github.com/apache/pulsar-client-python/issues/187


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add support for HasMessageAvailable [pulsar-dotpulsar]

2024-03-21 Thread via GitHub


blankensteiner commented on PR #207:
URL: https://github.com/apache/pulsar-dotpulsar/pull/207#issuecomment-2011678421

   Hi @smbecker and thank you for the PR :-)
   In regards to "bool HasReachedEndOfTopic();", this is a state and could be 
checked that way.
   In regards to "ValueTask HasMessageAvailable(CancellationToken 
cancellationToken = default);" I'm not sure why that is needed and why it is 
async? If you want to call "HasMessageAvailable" to make sure that Receive is 
not blocked, then that can not be guaranteed, because HasMessageAvailable might 
return true, but before you get to call Receive the Reader/Consumer can have 
lost the connection or faulted. Are that you really need something like this: 
   bool TryReceive(out Message message)?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] SendChannel.Completion causes the producer to become Faulted [pulsar-dotpulsar]

2024-03-21 Thread via GitHub


blankensteiner commented on issue #209:
URL: 
https://github.com/apache/pulsar-dotpulsar/issues/209#issuecomment-2011663856

   @kandersen82 could you have a look at this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Multi-threaded consumption issues [pulsar-dotpulsar]

2024-03-21 Thread via GitHub


blankensteiner commented on issue #208:
URL: 
https://github.com/apache/pulsar-dotpulsar/issues/208#issuecomment-2011662348

   Hi @gungod2000 
   Please fill out the bug report template, providing a full sample and stack 
trace.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add documents for the batching arguments when creating producer [pulsar-client-python]

2024-03-21 Thread via GitHub


BewareMyPower commented on PR #205:
URL: 
https://github.com/apache/pulsar-client-python/pull/205#issuecomment-2011602289

   https://github.com/apache/pulsar-client-python/assets/18204803/74560245-4f6d-4a03-ab29-cb782b151ac4;>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve] [pip] PIP-344 Correct the behavior of the public API pulsarClient.getPartitionsForTopic(topicName) [pulsar]

2024-03-21 Thread via GitHub


poorbarcode commented on code in PR #22182:
URL: https://github.com/apache/pulsar/pull/22182#discussion_r1533389551


##
pip/pip-344.md:
##
@@ -0,0 +1,115 @@
+# PIP-344: Correct the behavior of the public API 
pulsarClient.getPartitionsForTopic(topicName)
+
+# Background knowledge
+
+**Topic auto-creation**
+- The partitioned topic auto-creation is dependent on 
`pulsarClient.getPartitionsForTopic`
+- It triggers partitioned metadata creation by 
`pulsarClient.getPartitionsForTopic`
+- And triggers the topic partition creation by producers' registration and 
consumers' registration.
+- When calling `pulsarClient.getPartitionsForTopic(topicName)`, Pulsar will 
automatically create the partitioned topic metadata if it does not exist, 
either using `HttpLookupService` or `BinaryProtoLookupService`.
+
+**Now `pulsarClient.getPartitionsForTopic`'s behavior**
+| case | broker allow `auto-create` | param allow  `create if not exists` 
| non-partitioned topic | partitioned topic |  current behavior |
+| --- | --- | --- | --- | --- | --- |
+| 1 | `true/false` | `true/false` | `exists: true` | | REST API: `partitions: 
0` Binary API: `partitions: 0` |
+| 2 | `true/false` | `true/false` | | `exists: true`  `partitions: 3` | 
REST API: `partitions: 3` Binary API: `partitions: 3` |
+| 3 | `true` | `true` | | | REST API:  - `create new: true` 
 - `partitions: 3`  Binary API:  - `create 
new: true`  - `partitions: 3`  |
+| 4 | `true` | `false` | | | REST API:  - `create new: false` 
 - `partitions: 0`  Binary API:  not 
support  |
+| 5 | `false` | `true` | | | REST API:  - `create new: false` 
 - `partitions: 0`  Binary API:  - `create 
new: false`  - `partitions: 0`  |
+
+- Broker allows `auto-create`: see also the config `allowAutoTopicCreation` in 
`broker.conf`.
+- Param allow  `create if not exists`
+  - Regarding the HTTP API `PersistentTopics.getPartitionedMetadata`, it is an 
optional param which named `checkAllowAutoCreation,` and the default value is 
`false`.
+  - Regarding the `pulsar-admin` API, it depends on the HTTP API 
`PersistentTopics.getPartitionedMetadata`, and it always sets the param 
`checkAllowAutoCreation` to `false` and can not be set manually.
+  - Regarding the client API `HttpLookupService.getPartitionedTopicMetadata`, 
it depends on the HTTP API `PersistentTopics.getPartitionedMetadata`, and it 
always sets the param `checkAllowAutoCreation` to `true` and can not be set 
manually.
+  - Regarding the client API 
`BinaryProtoLookupService.getPartitionedTopicMetadata`, it always tries to 
create partitioned metadata.
+- `REST API & HTTP API`: Since there are only two implementations of the 4 
ways to get partitioned metadata, we call HTTP API 
`PersistentTopics.getPartitionedMetadata`, `pulsar-admin`, and 
`HttpLookupService.getPartitionedTopicMetadata` HTTP API, and call 
`BinaryProtoLookupService.getPartitionedTopicMetadata` Binary API.
+
+# Motivation
+
+The param `create if not exists` of the Binary API is always `true.`
+
+- For case 4 of `pulsarClient.getPartitionsForTopic`'s behavior, it always 
tries to create the partitioned metadata, but the API name is `getxxx`.
+- For case 5 of `pulsarClient.getPartitionsForTopic`'s behavior, it returns a 
`0` partitioned metadata, but the topic does not exist. For the correct 
behavior of this case, we had discussed 
[here](https://github.com/apache/pulsar/issues/8813) before.
+- BTW, 
[flink-connector-pulsar](https://github.com/apache/flink-connector-pulsar/blob/main/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java#L221-L227)
 is using this API to create partitioned topic metadata.
+
+# Goals
+
+- Regarding the case 4: Add a new API 
`PulsarClient.getPartitionsForTopic(String, boolean)` to support the feature 
that just get partitioned topic metadata and do not try to create one. See 
detail below.
+- Regarding the case 5: Instead of returning a `0` partitioned metadata, 
respond to a not found error when calling 
`pulsarClient.getPartitionsForTopic(String)` if the topic does not exist.
+
+# Detailed Design
+
+## Public-facing Changes
+
+When you call the public API `pulsarClient.getPartitionsForTopic`, pulsar will 
not create the partitioned metadata anymore.
+
+### Public API
+**LookupService.java**
+```java
+
+- CompletableFuture 
getPartitionedTopicMetadata(TopicName topicName);
+
++ // A new API that contains an additional param "createIfAutoCreationEnabled."
++ CompletableFuture 
getPartitionedTopicMetadata(TopicName topicName, boolean 
createIfAutoCreationEnabled);

Review Comment:
   Sure, Improved the description for the new APIs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar-client-go) branch master updated: [fix] Change the wrong `SourceInstanceStatusData` in SinkInstanceStatus (#1199)

2024-03-21 Thread zike
This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 3693b369 [fix] Change the wrong `SourceInstanceStatusData` in 
SinkInstanceStatus (#1199)
3693b369 is described below

commit 3693b3695e2f072f9506b64b5e3000e5f107070d
Author: jiangpengcheng 
AuthorDate: Thu Mar 21 15:39:08 2024 +0800

[fix] Change the wrong `SourceInstanceStatusData` in SinkInstanceStatus 
(#1199)
---
 pulsaradmin/pkg/utils/sink_status.go | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/pulsaradmin/pkg/utils/sink_status.go 
b/pulsaradmin/pkg/utils/sink_status.go
index 6cdb091f..a2651959 100644
--- a/pulsaradmin/pkg/utils/sink_status.go
+++ b/pulsaradmin/pkg/utils/sink_status.go
@@ -28,8 +28,8 @@ type SinkStatus struct {
 }
 
 type SinkInstanceStatus struct {
-   InstanceID int  `json:"instanceId"`
-   Status SourceInstanceStatusData `json:"status"`
+   InstanceID int`json:"instanceId"`
+   Status SinkInstanceStatusData `json:"status"`
 }
 
 type SinkInstanceStatusData struct {



Re: [PR] [fix] Change the wrong `SourceInstanceStatusData` in `SinkInstanceStatus` [pulsar-client-go]

2024-03-21 Thread via GitHub


RobertIndie merged PR #1199:
URL: https://github.com/apache/pulsar-client-go/pull/1199


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve] [pip] PIP-344 Correct the behavior of the public API pulsarClient.getPartitionsForTopic(topicName) [pulsar]

2024-03-21 Thread via GitHub


RobertIndie commented on code in PR #22182:
URL: https://github.com/apache/pulsar/pull/22182#discussion_r1533359134


##
pip/pip-344.md:
##
@@ -0,0 +1,115 @@
+# PIP-344: Correct the behavior of the public API 
pulsarClient.getPartitionsForTopic(topicName)
+
+# Background knowledge
+
+**Topic auto-creation**
+- The partitioned topic auto-creation is dependent on 
`pulsarClient.getPartitionsForTopic`
+- It triggers partitioned metadata creation by 
`pulsarClient.getPartitionsForTopic`
+- And triggers the topic partition creation by producers' registration and 
consumers' registration.
+- When calling `pulsarClient.getPartitionsForTopic(topicName)`, Pulsar will 
automatically create the partitioned topic metadata if it does not exist, 
either using `HttpLookupService` or `BinaryProtoLookupService`.
+
+**Now `pulsarClient.getPartitionsForTopic`'s behavior**
+| case | broker allow `auto-create` | param allow  `create if not exists` 
| non-partitioned topic | partitioned topic |  current behavior |
+| --- | --- | --- | --- | --- | --- |
+| 1 | `true/false` | `true/false` | `exists: true` | | REST API: `partitions: 
0` Binary API: `partitions: 0` |
+| 2 | `true/false` | `true/false` | | `exists: true`  `partitions: 3` | 
REST API: `partitions: 3` Binary API: `partitions: 3` |
+| 3 | `true` | `true` | | | REST API:  - `create new: true` 
 - `partitions: 3`  Binary API:  - `create 
new: true`  - `partitions: 3`  |
+| 4 | `true` | `false` | | | REST API:  - `create new: false` 
 - `partitions: 0`  Binary API:  not 
support  |
+| 5 | `false` | `true` | | | REST API:  - `create new: false` 
 - `partitions: 0`  Binary API:  - `create 
new: false`  - `partitions: 0`  |
+
+- Broker allows `auto-create`: see also the config `allowAutoTopicCreation` in 
`broker.conf`.
+- Param allow  `create if not exists`
+  - Regarding the HTTP API `PersistentTopics.getPartitionedMetadata`, it is an 
optional param which named `checkAllowAutoCreation,` and the default value is 
`false`.
+  - Regarding the `pulsar-admin` API, it depends on the HTTP API 
`PersistentTopics.getPartitionedMetadata`, and it always sets the param 
`checkAllowAutoCreation` to `false` and can not be set manually.
+  - Regarding the client API `HttpLookupService.getPartitionedTopicMetadata`, 
it depends on the HTTP API `PersistentTopics.getPartitionedMetadata`, and it 
always sets the param `checkAllowAutoCreation` to `true` and can not be set 
manually.
+  - Regarding the client API 
`BinaryProtoLookupService.getPartitionedTopicMetadata`, it always tries to 
create partitioned metadata.
+- `REST API & HTTP API`: Since there are only two implementations of the 4 
ways to get partitioned metadata, we call HTTP API 
`PersistentTopics.getPartitionedMetadata`, `pulsar-admin`, and 
`HttpLookupService.getPartitionedTopicMetadata` HTTP API, and call 
`BinaryProtoLookupService.getPartitionedTopicMetadata` Binary API.
+
+# Motivation
+
+The param `create if not exists` of the Binary API is always `true.`
+
+- For case 4 of `pulsarClient.getPartitionsForTopic`'s behavior, it always 
tries to create the partitioned metadata, but the API name is `getxxx`.
+- For case 5 of `pulsarClient.getPartitionsForTopic`'s behavior, it returns a 
`0` partitioned metadata, but the topic does not exist. For the correct 
behavior of this case, we had discussed 
[here](https://github.com/apache/pulsar/issues/8813) before.
+- BTW, 
[flink-connector-pulsar](https://github.com/apache/flink-connector-pulsar/blob/main/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java#L221-L227)
 is using this API to create partitioned topic metadata.
+
+# Goals
+
+- Regarding the case 4: Add a new API 
`PulsarClient.getPartitionsForTopic(String, boolean)` to support the feature 
that just get partitioned topic metadata and do not try to create one. See 
detail below.
+- Regarding the case 5: Instead of returning a `0` partitioned metadata, 
respond to a not found error when calling 
`pulsarClient.getPartitionsForTopic(String)` if the topic does not exist.
+
+# Detailed Design
+
+## Public-facing Changes
+
+When you call the public API `pulsarClient.getPartitionsForTopic`, pulsar will 
not create the partitioned metadata anymore.
+
+### Public API
+**LookupService.java**
+```java
+
+- CompletableFuture 
getPartitionedTopicMetadata(TopicName topicName);
+
++ // A new API that contains an additional param "createIfAutoCreationEnabled."
++ CompletableFuture 
getPartitionedTopicMetadata(TopicName topicName, boolean 
createIfAutoCreationEnabled);

Review Comment:
   I think we should document clearly which exceptions will be thrown exactly. 
The exception should also be part of the interface changes, so I think we need 
to explain it clearly in the proposal.
   
   IIUC, this method may throw two types of exceptions: not found and not 
supported. But which specific exception class is used?



-- 
This is an automated message from the 

(pulsar) branch master updated: [improve][admin] Fix the `createMissingPartitions` doesn't response correctly (#22311)

2024-03-21 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 5cabcacbfa8 [improve][admin] Fix the `createMissingPartitions` doesn't 
response correctly (#22311)
5cabcacbfa8 is described below

commit 5cabcacbfa8874931d501cd040f7a8ac3d6d1923
Author: Jiwei Guo 
AuthorDate: Thu Mar 21 15:24:50 2024 +0800

[improve][admin] Fix the `createMissingPartitions` doesn't response 
correctly (#22311)
---
 .../org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java  | 4 +++-
 .../java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java  | 7 +++
 2 files changed, 10 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 10cf5edd3c3..86993f749b5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -487,7 +487,7 @@ public class PersistentTopicsBase extends AdminResource {
 
 protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) 
{
 getPartitionedTopicMetadataAsync(topicName, false, 
false).thenAccept(metadata -> {
-if (metadata != null) {
+if (metadata != null && metadata.partitions > 0) {
 CompletableFuture future = 
validateNamespaceOperationAsync(topicName.getNamespaceObject(),
 NamespaceOperation.CREATE_TOPIC);
 future.thenCompose(__ -> 
tryCreatePartitionsAsync(metadata.partitions)).thenAccept(v -> {
@@ -497,6 +497,8 @@ public class PersistentTopicsBase extends AdminResource {
 resumeAsyncResponseExceptionally(asyncResponse, e);
 return null;
 });
+} else {
+throw new RestException(Status.NOT_FOUND, String.format("Topic 
%s does not exist", topicName));
 }
 }).exceptionally(ex -> {
 // If the exception is not redirect exception we need to log it.
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 23cb413614f..9a292175caa 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -33,6 +33,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
@@ -1779,4 +1780,10 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
 assertTrue(namespaces.contains(ns1V2));
 assertTrue(namespaces.contains(ns1V1));
 }
+
+@Test
+public void testCreateMissingPartitions() throws Exception {
+String topicName = "persistent://" + testTenant + "/" + 
testNamespaceLocal + "/testCreateMissingPartitions";
+assertThrows(PulsarAdminException.NotFoundException.class, () -> 
admin.topics().createMissedPartitions(topicName));
+}
 }



Re: [PR] [improve][admin] Fix the `createMissingPartitions` doesn't response correctly [pulsar]

2024-03-21 Thread via GitHub


Technoboy- merged PR #22311:
URL: https://github.com/apache/pulsar/pull/22311


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][client]Fixed getting an incorrect `maxMessageSize` value when accessing multiple clusters in the same process [pulsar]

2024-03-21 Thread via GitHub


chenhongSZ commented on PR #22306:
URL: https://github.com/apache/pulsar/pull/22306#issuecomment-2011306009

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org