Re: [PR] [fix][broker] Create new ledger after the current ledger is closed [pulsar]
codelipenghui merged PR #22034: URL: https://github.com/apache/pulsar/pull/22034 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [fix][broker] Create new ledger after the current ledger is closed (#22034)
This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new d0ca9835cf9 [fix][broker] Create new ledger after the current ledger is closed (#22034) d0ca9835cf9 is described below commit d0ca9835cf972ce156bd4a1fc5d109482330857d Author: Xiangying Meng <55571188+liangyepianz...@users.noreply.github.com> AuthorDate: Fri Mar 22 11:52:47 2024 +0800 [fix][broker] Create new ledger after the current ledger is closed (#22034) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 2 +- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 22 ++-- .../bookkeeper/mledger/impl/ManagedCursorTest.java | 33 -- .../mledger/impl/ManagedLedgerFactoryTest.java | 2 +- .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 111 +++- .../mledger/impl/NonDurableCursorTest.java | 17 ++-- .../mledger/impl/ShadowManagedLedgerImplTest.java | 5 +- .../broker/service/BacklogQuotaManagerTest.java| 13 ++- .../broker/service/BrokerBkEnsemblesTests.java | 12 +-- .../broker/service/BrokerBookieIsolationTest.java | 112 ++--- .../broker/service/ConsumedLedgersTrimTest.java| 6 +- .../client/impl/ProducerConsumerInternalTest.java | 44 12 files changed, 276 insertions(+), 103 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 9c3598f46ef..8b13fc0f342 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1274,7 +1274,7 @@ public class ManagedCursorImpl implements ManagedCursor { if (proposedReadPosition.equals(PositionImpl.EARLIEST)) { newReadPosition = ledger.getFirstPosition(); } else if (proposedReadPosition.equals(PositionImpl.LATEST)) { -newReadPosition = ledger.getLastPosition().getNext(); +newReadPosition = ledger.getNextValidPosition(ledger.getLastPosition()); } else { newReadPosition = proposedReadPosition; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index c839ee6f77c..1c0a0465507 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1754,10 +1754,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { maybeOffloadInBackground(NULL_OFFLOAD_PROMISE); -if (!pendingAddEntries.isEmpty()) { -// Need to create a new ledger to write pending entries -createLedgerAfterClosed(); -} +createLedgerAfterClosed(); } @Override @@ -1812,7 +1809,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } ledgerClosed(lh); -createLedgerAfterClosed(); } }, null); } @@ -2649,7 +2645,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } else { PositionImpl slowestReaderPosition = cursors.getSlowestReaderPosition(); if (slowestReaderPosition != null) { -slowestReaderLedgerId = slowestReaderPosition.getLedgerId(); +// The slowest reader position is the mark delete position. +// If the slowest reader position point the last entry in the ledger x, +// the slowestReaderLedgerId should be x + 1 and the ledger x could be deleted. +LedgerInfo ledgerInfo = ledgers.get(slowestReaderPosition.getLedgerId()); +if (ledgerInfo != null && ledgerInfo.getLedgerId() != currentLedger.getId() +&& ledgerInfo.getEntries() == slowestReaderPosition.getEntryId() + 1) { +slowestReaderLedgerId = slowestReaderPosition.getLedgerId() + 1; +} else { +slowestReaderLedgerId = slowestReaderPosition.getLedgerId(); +} } else { promise.completeExceptionally(new ManagedLedgerException("Couldn't find reader position")); trimmerMutex.unlock(); @@ -3693,7 +3698,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { PositionImpl skippedPosition = position.getPositionAfterEntries(skippedEntryNum);
[PR] [improve][fn] Support OAuth2 in Go instance [pulsar]
jiangpengcheng opened a new pull request, #22323: URL: https://github.com/apache/pulsar/pull/22323 Fixes #22322 Main Issue: #xyz PIP: #xyz ### Motivation The Go instance doesn't support OAuth2 authentication yet, it's better to support it ### Modifications Support the OAuth2 in Go instance ### Verifying this change - [x] Make sure that the change passes the CI checks. - [x] This change is a trivial rework / code cleanup without any test coverage. ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: https://github.com/jiangpengcheng/pulsar/pull/30 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[I] Support OAuth2 authentication in Go instance of pulsar functions [pulsar]
jiangpengcheng opened a new issue, #22322: URL: https://github.com/apache/pulsar/issues/22322 ### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Motivation the pulsar go client already supports the OAuth2, it's better to support OAuth2 in the Go instance too ### Solution _No response_ ### Alternatives _No response_ ### Anything else? _No response_ ### Are you willing to submit a PR? - [X] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Fix ResourceGroups loading [pulsar]
nodece commented on PR #21781: URL: https://github.com/apache/pulsar/pull/21781#issuecomment-2014266913 /pulsarbot rerun-failure-checks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar-client-python) annotated tag v3.5.0-candidate-1 updated (13c8c4e -> a094fdd)
This is an automated email from the ASF dual-hosted git repository. xyz pushed a change to annotated tag v3.5.0-candidate-1 in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git *** WARNING: tag v3.5.0-candidate-1 was modified! *** from 13c8c4e (commit) to a094fdd (tag) tagging 13c8c4e621737395d6b9648e2bf0150d66eedbe6 (commit) by Yunze Xu on Fri Mar 22 10:15:15 2024 +0800 - Log - Release v3.5.0 candidate 1 -BEGIN PGP SIGNATURE- iQJDBAABCgAtFiEEn+m0+KLf1EiRy6J0Qrtq+2zSb6YFAmX86bMPHHh5ekBhcGFj aGUub3JnAAoJEEK7avts0m+mSusQAIGcZb6wL1LhQDRfAIcV+1RJRTjWeSXs5+Ha 6cD++IFac3w9+fbKYu9FBzaqG5FzPXWwSbO618OWYPR0BgOrKFYS5+mGWjEZFIaZ VK7pXvMrY6qHBSPWtXuGvqE9dVzB8anzo2Bm8XeuZbYKSJUPaqpuSmLtP0LgKgH/ hHski8stKFAwPFXl+erPOJKra9LeNTswGysnQQyqtuMkDTLkfLtpmET55VqeyTQc DTB8gJxUCJxMsMVJ/qNxcjDvOc3vHeH6lHmEE87CPvaza8BboQ8MP9qU8tsueqZB QYdCxbJyPNjjk+YWfdKziybOmzG/9JneZN8TB1vQJqnnp6RwbuAWpWtvVqJC9cU0 7PKJq9GpBY7cq6sK4rwMddx47lJbM9afQawW2C7TefEzthgQZGqr7IgNUpL0d5H7 uIlvcdLtB46Layt0i5ECnIhCdtOJJFc3uz9c1LneRKAArfefhbr1Hni94MK5HkHy noPMyFgiTPvnXHQUipzNRsN6uIbP3fHnUKZ9nqZIL/0lz1t459vFOhMpusAm4AgP anku6oqaZ2NHI9geVDhPJWEsRnEyS/ohB+2FV2QbUt+bIRTXUuXkjSQbiATN74qi LEJnLSCyQ7mu/7uzsKzrF1gxJJVP4XF0z1dSOIiDgOxJBgwYCqMtKYn4ipMVudLd mPtNPSVC =/agE -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
(pulsar-client-python) branch branch-3.5 created (now 13c8c4e)
This is an automated email from the ASF dual-hosted git repository. xyz pushed a change to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git at 13c8c4e Bump version to 3.5.0 This branch includes the following new commits: new 13c8c4e Bump version to 3.5.0 The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
(pulsar-client-python) 01/01: Bump version to 3.5.0
This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git commit 13c8c4e621737395d6b9648e2bf0150d66eedbe6 Author: Yunze Xu AuthorDate: Fri Mar 22 10:14:44 2024 +0800 Bump version to 3.5.0 --- pulsar/__about__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/__about__.py b/pulsar/__about__.py index e891b1b..aca9e7b 100644 --- a/pulsar/__about__.py +++ b/pulsar/__about__.py @@ -16,4 +16,4 @@ # specific language governing permissions and limitations # under the License. # -__version__='3.5.0a1' +__version__='3.5.0'
(pulsar-site) branch main updated: Docs sync done from apache/pulsar (#69c45ad)
This is an automated email from the ASF dual-hosted git repository. urfree pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-site.git The following commit(s) were added to refs/heads/main by this push: new 2c00c5e224e3 Docs sync done from apache/pulsar (#69c45ad) 2c00c5e224e3 is described below commit 2c00c5e224e383aecaff6dc68e4d81550911c894 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Fri Mar 22 01:32:12 2024 + Docs sync done from apache/pulsar (#69c45ad) --- static/reference/next/pulsar-perf/pulsar-perf.md | 290 +++ 1 file changed, 145 insertions(+), 145 deletions(-) diff --git a/static/reference/next/pulsar-perf/pulsar-perf.md b/static/reference/next/pulsar-perf/pulsar-perf.md index 004c75bf2421..dd67609564a7 100644 --- a/static/reference/next/pulsar-perf/pulsar-perf.md +++ b/static/reference/next/pulsar-perf/pulsar-perf.md @@ -9,62 +9,62 @@ $ pulsar-perf produce [options] |Flag|Description|Default| |---|---|---| -| `-ch, --chunking` | Should split the message and publish in chunks if message size is larger than allowed max size|false| -| `-threads, --num-test-threads` | Number of test threads|1| -| `--separator` | Separator between the topic and topic number|-| -| `-au, --admin-url` | Pulsar Admin URL|null| -| `-db, --disable-batching` | Disable batching if true|false| +| `-h, --help` | Print help message|false| | `-cf, --conf-file` | Pulsar configuration file|null| -| `-pn, --producer-name` | Producer Name|null| -| `-fc, --format-class` | Custom Formatter class name|org.apache.pulsar.testclient.DefaultMessageFormatter| -| `-k, --encryption-key-name` | The public key name to encrypt payload|null| -| `-nmt, --numMessage-perTransaction` | The number of messages sent by a transaction. (After --txn-enable setting to true, -nmt takes effect)|50| -| `--tls-enable-hostname-verification` | Enable TLS hostname verification|null| +| `-u, --service-url` | Pulsar Service URL|null| +| `--auth-plugin` | Authentication plugin class name|null| +| `--auth-params` | Authentication parameters, whose format is determined by the implementation of method `configure` in authentication plugin class, for example "key1:val1,key2:val2" or "{"key1":"val1","key2":"val2"}".|null| +| `--trust-cert-file` | Path for the trusted TLS certificate file|| | `--tls-allow-insecure` | Allow insecure TLS connection|null| -| `-bw, --busy-wait` | Enable Busy-Wait on the Pulsar client|false| +| `--tls-enable-hostname-verification` | Enable TLS hostname verification|null| +| `-c, --max-connections` | Max number of TCP connections to a single broker|1| | `-i, --stats-interval-seconds` | Statistics Interval Seconds. If 0, statistics will be disabled|0| -| `-np, --partitions` | Create partitioned topics with the given number of partitions, set 0 to not try to create the topic|null| -| `-txn, --txn-enable` | Enable or disable the transaction|false| -| `-fp, --format-payload` | Format %i as a message index in the stream from producer and/or %t as the timestamp nanoseconds.|false| +| `-ioThreads, --num-io-threads` | Set the number of threads to be used for handling connections to brokers. The default value is 1.|1| +| `-bw, --busy-wait` | Enable Busy-Wait on the Pulsar client|false| +| `--listener-name` | Listener name for the broker.|null| +| `-lt, --num-listener-threads` | Set the number of threads to be used for message listeners|1| +| `-mlr, --max-lookup-request` | Maximum number of lookup requests allowed on each broker connection to prevent overloading a broker|5| +| `--proxy-url` | Proxy-server URL to which to connect.|null| | `--proxy-protocol` | Proxy protocol to select type of routing at proxy.|null| -| `-dr, --delay-range` | Mark messages with a given delay by a random number of seconds. this value between the specified origin (inclusive) and the specified bound (exclusive). e.g. 1,300|null| +| `-ml, --memory-limit` | Configure the Pulsar client memory limit (eg: 32M, 64M)|0| | `-t, --num-topics, --num-topic` | Number of topics. Must matchthe given number of topic arguments.|1| -| `-v, --encryption-key-value-file` | The file which contains the public key to encrypt payload|null| -| `-bb, --batch-max-bytes` | Maximum number of bytes per batch|4194304| -| `-u, --service-url` | Pulsar Service URL|null| +| `-threads, --num-test-threads` | Number of test threads|1| | `-r, --rate` | Publish rate msg/s across topics|100| -| `-ef, --exit-on-failure` | Exit from the process on publish failure (default: disable)|false| +| `-s, --size` | Message size (bytes)|1024| +| `-n, --num-producers` | Number of producers (per topic)|1| +| `--separator` | Separator between the topic and topic number|-| +| `--send-timeout` | Set the sendTimeout value default 0 to keep compatibility with previous version of pulsar-perf|0| +| `-pn, --producer-name` | Producer Name|null| +| `-au, --admin-url` | Pulsar
Re: [PR] [improve][cli] PIP-343: Use picocli instead of jcommander in pulsar-perf [pulsar]
Technoboy- merged PR #22303: URL: https://github.com/apache/pulsar/pull/22303 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [improve][cli] PIP-343: Use picocli instead of jcommander in pulsar-perf (#22303)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 69c45ad5300 [improve][cli] PIP-343: Use picocli instead of jcommander in pulsar-perf (#22303) 69c45ad5300 is described below commit 69c45ad5300e36a62a923b8eaa58aab99c6e02fb Author: crossoverJie AuthorDate: Fri Mar 22 09:12:37 2024 +0800 [improve][cli] PIP-343: Use picocli instead of jcommander in pulsar-perf (#22303) Co-authored-by: Zixuan Liu --- .../cli/converters/ByteUnitToLongConverter.java| 39 - .../pulsar/cli/converters/ByteConversionTest.java | 9 +- pulsar-testclient/pom.xml | 4 +- .../proxy/socket/client/PerformanceClient.java | 65 +++ .../apache/pulsar/testclient/BrokerMonitor.java| 30 +++ .../testclient/CmdGenerateDocumentation.java | 67 +-- .../pulsar/testclient/LoadSimulationClient.java| 34 .../testclient/LoadSimulationController.java | 68 +++ .../pulsar/testclient/ManagedLedgerWriter.java | 57 ++--- .../testclient/PerformanceBaseArguments.java | 59 +++-- .../pulsar/testclient/PerformanceConsumer.java | 65 --- .../pulsar/testclient/PerformanceProducer.java | 96 +++--- .../pulsar/testclient/PerformanceReader.java | 19 +++-- .../testclient/PerformanceTopicListArguments.java | 10 ++- .../pulsar/testclient/PerformanceTransaction.java | 45 +- ...or.java => PositiveNumberParameterConvert.java} | 15 ++-- .../pulsar/testclient/GenerateDocumentionTest.java | 37 + 17 files changed, 377 insertions(+), 342 deletions(-) diff --git a/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/ByteUnitToLongConverter.java b/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/ByteUnitToLongConverter.java deleted file mode 100644 index 6170fb489d4..000 --- a/pulsar-cli-utils/src/main/java/org/apache/pulsar/cli/converters/ByteUnitToLongConverter.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.cli.converters; - -import static org.apache.pulsar.cli.ValueValidationUtil.emptyCheck; -import com.beust.jcommander.converters.BaseConverter; - -public class ByteUnitToLongConverter extends BaseConverter { - -public ByteUnitToLongConverter(String optionName) { -super(optionName); -} - -@Override -public Long convert(String argStr) { -return parseBytes(argStr); -} - -Long parseBytes(String argStr) { -emptyCheck(getOptionName(), argStr); -return ByteUnitUtil.validateSizeString(argStr); -} -} diff --git a/pulsar-cli-utils/src/test/java/org/apache/pulsar/cli/converters/ByteConversionTest.java b/pulsar-cli-utils/src/test/java/org/apache/pulsar/cli/converters/ByteConversionTest.java index 283e94bfb9c..6e7a2e6d7e3 100644 --- a/pulsar-cli-utils/src/test/java/org/apache/pulsar/cli/converters/ByteConversionTest.java +++ b/pulsar-cli-utils/src/test/java/org/apache/pulsar/cli/converters/ByteConversionTest.java @@ -21,6 +21,7 @@ package org.apache.pulsar.cli.converters; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertThrows; import org.apache.pulsar.cli.converters.picocli.ByteUnitToIntegerConverter; +import org.apache.pulsar.cli.converters.picocli.ByteUnitToLongConverter; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import picocli.CommandLine.TypeConversionException; @@ -59,8 +60,8 @@ public class ByteConversionTest { } @Test(dataProvider = "successfulByteUnitUtilTestCases") -public void testSuccessfulByteUnitToLongConverter(String input, long expected) { -ByteUnitToLongConverter converter = new ByteUnitToLongConverter("optionName"); +public void testSuccessfulByteUnitToLongConverter(String input, long expected) throws Exception{ +ByteUnitToLongConverter converter = new ByteUnitToLongConverter();
Re: [PR] [fix] [broker] fix mismatch between dispatcher.consumerList and dispatcher.consumerSet [pulsar]
codelipenghui commented on code in PR #22283: URL: https://github.com/apache/pulsar/pull/22283#discussion_r1534887661 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java: ## @@ -1224,12 +1224,22 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady, "Consumer is already present on the connection"); } else if (existingConsumerFuture.isCompletedExceptionally()){ +log.warn("[{}][{}][{}] A failed consumer with id is already present on the connection," ++ " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId); ServerError error = getErrorCodeWithErrorLog(existingConsumerFuture, true, -String.format("Consumer subscribe failure. remoteAddress: %s, subscription: %s", +String.format("A failed consumer with id is already present on the connection." ++ " remoteAddress: %s, subscription: %s", remoteAddress, subscriptionName)); -consumers.remove(consumerId, existingConsumerFuture); +/** + * This feature may was failed due to the client closed a in-progress subscribing. + * See {@link #handleCloseConsumer(CommandCloseConsumer)} + * Do not remove the failed feature at current line, it will be removed after the progress of Review Comment: ```suggestion * Do not remove the future feature at current line, it will be removed after the progress of ``` ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java: ## @@ -1224,12 +1224,22 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady, "Consumer is already present on the connection"); } else if (existingConsumerFuture.isCompletedExceptionally()){ +log.warn("[{}][{}][{}] A failed consumer with id is already present on the connection," ++ " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId); ServerError error = getErrorCodeWithErrorLog(existingConsumerFuture, true, -String.format("Consumer subscribe failure. remoteAddress: %s, subscription: %s", +String.format("A failed consumer with id is already present on the connection." ++ " remoteAddress: %s, subscription: %s", remoteAddress, subscriptionName)); -consumers.remove(consumerId, existingConsumerFuture); +/** + * This feature may was failed due to the client closed a in-progress subscribing. + * See {@link #handleCloseConsumer(CommandCloseConsumer)} + * Do not remove the failed feature at current line, it will be removed after the progress of + * the previous subscribing is done. + * Before the previous subscribing is done, the new subscribe request will always fail. + * This mechanism is in order to prevent more complex logic to handle the race conditions. + */ commandSender.sendErrorResponse(requestId, error, -"Consumer that failed is already present on the connection"); +"A failed consumer is already present on the connection"); Review Comment: Hmm, It looks like we don't need to change it. A failed consumer can be understand as a different consumer But if say `Consumer that ...`, it usually means the same consumer ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java: ## @@ -1224,12 +1224,22 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady, "Consumer is already present on the connection"); } else if (existingConsumerFuture.isCompletedExceptionally()){ +log.warn("[{}][{}][{}] A failed consumer with id is already present on the connection," ++ " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
[PR] [fix][broker] Update TransferShedder underloaded broker check to consider max loaded broker's msgThroughputEMA and update IsExtensibleLoadBalancerImpl check [pulsar]
heesung-sn opened a new pull request, #22321: URL: https://github.com/apache/pulsar/pull/22321 ### Motivation Checking zero traffic is not practical to identify if a broker is too underloaded or not. There might be some health-checking traffic, which might make the broker's traffic slightly bigger than 1 byte/s. We better consider some relative measurements to check if a broker is significantly underloaded. ### Modifications - Update TransferShedder underloaded broker check to consider max loaded broker's msgThroughputEMA - Delete `isLoadManagerExtensionEnabled(ServiceConfiguration conf)` to use only `isLoadManagerExtensionEnabled(PulsarService pulsar)` api ### Verifying this change - [x] Make sure that the change passes the CI checks. ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add support for HasMessageAvailable [pulsar-dotpulsar]
smbecker commented on PR #207: URL: https://github.com/apache/pulsar-dotpulsar/pull/207#issuecomment-2013677434 > In regards to "ValueTask HasMessageAvailable(CancellationToken cancellationToken = default);" I'm not sure why that is needed and why it is async? My original intent was to replicate something similar to `EndOfStreamAction` from the [pulsar-client-reactive](https://github.com/apache/pulsar-client-reactive/blob/b8b42df975c4bb2cd275936402d6439087695554/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReader.java#L65) project -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add support for HasMessageAvailable [pulsar-dotpulsar]
smbecker commented on PR #207: URL: https://github.com/apache/pulsar-dotpulsar/pull/207#issuecomment-2013672203 > In regards to "bool HasReachedEndOfTopic();", this is a state and could be checked that way. It currently is not possible to check current state, which was the purpose of [this change](https://github.com/apache/pulsar-dotpulsar/pull/207/files#diff-dd82a6892bf52f24ca2167d247198fbd63b6126383a09abb28280636ab8303bbR66). Also, this allows a consistent method for checking that covers both readers and consumers, which seems to line up with the purpose of the `IReceive` interface. Should I move the changes to `IStateChanged` to a separate PR? Should I remove this method in favor of just allowing checking the current state? > In regards to "ValueTask HasMessageAvailable(CancellationToken cancellationToken = default);" I'm not sure why that is needed and why it is async? If you want to call "HasMessageAvailable" to make sure that Receive is not blocked, then that can not be guaranteed, because HasMessageAvailable might return true, but before you get to call Receive the Reader/Consumer can have lost the connection or faulted. Are that you really need something like this: bool TryReceive(out Message message)? I was actually looking for something more similar to `TryReceive` but saw `hasMessageAvailableAsync` in the Java Client. However, it isn't simply just "Is there any messages that are already downloaded?"; It is also answering the question of "Is there any messages that could be downloaded from the broker?". The Java Client actually makes a call to `GetLastMessageId` to determine if the server has any additional messages that could be downloaded. Should I add `TryReceive` that only attempts to receive from what has already been downloaded? Should I update the implementation to be more inline with the Java Client in using `GetLastMessageId`? Please advise... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [improve][misc] Include native epoll library for Netty for arm64 (#22319)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 24e9437ce06 [improve][misc] Include native epoll library for Netty for arm64 (#22319) 24e9437ce06 is described below commit 24e9437ce065613fd924a74f61b620d9fdc0058b Author: Lari Hotari AuthorDate: Thu Mar 21 13:23:21 2024 -0700 [improve][misc] Include native epoll library for Netty for arm64 (#22319) --- distribution/server/src/assemble/LICENSE.bin.txt | 1 + distribution/shell/src/assemble/LICENSE.bin.txt | 1 + pulsar-common/pom.xml| 6 ++ 3 files changed, 8 insertions(+) diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index dac03d966a6..cb99d62edfe 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -309,6 +309,7 @@ The Apache Software License, Version 2.0 - io.netty-netty-resolver-dns-native-macos-4.1.105.Final-osx-x86_64.jar - io.netty-netty-transport-4.1.105.Final.jar - io.netty-netty-transport-classes-epoll-4.1.105.Final.jar +- io.netty-netty-transport-native-epoll-4.1.105.Final-linux-aarch_64.jar - io.netty-netty-transport-native-epoll-4.1.105.Final-linux-x86_64.jar - io.netty-netty-transport-native-unix-common-4.1.105.Final.jar - io.netty-netty-transport-native-unix-common-4.1.105.Final-linux-x86_64.jar diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt b/distribution/shell/src/assemble/LICENSE.bin.txt index 7ae4241dfb9..2b2f1c26be1 100644 --- a/distribution/shell/src/assemble/LICENSE.bin.txt +++ b/distribution/shell/src/assemble/LICENSE.bin.txt @@ -360,6 +360,7 @@ The Apache Software License, Version 2.0 - netty-resolver-dns-4.1.105.Final.jar - netty-transport-4.1.105.Final.jar - netty-transport-classes-epoll-4.1.105.Final.jar +- netty-transport-native-epoll-4.1.105.Final-linux-aarch_64.jar - netty-transport-native-epoll-4.1.105.Final-linux-x86_64.jar - netty-transport-native-unix-common-4.1.105.Final.jar - netty-transport-native-unix-common-4.1.105.Final-linux-x86_64.jar diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml index 1b54a7aee2d..f93a9ef654a 100644 --- a/pulsar-common/pom.xml +++ b/pulsar-common/pom.xml @@ -99,6 +99,12 @@ linux-x86_64 + + io.netty + netty-transport-native-epoll + linux-aarch_64 + + io.netty netty-transport-native-unix-common
Re: [PR] [improve][misc] Include native epoll library for Netty for arm64 [pulsar]
lhotari merged PR #22319: URL: https://github.com/apache/pulsar/pull/22319 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Fix ResourceGroups loading [pulsar]
nodece commented on code in PR #21781: URL: https://github.com/apache/pulsar/pull/21781#discussion_r1534195770 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListener.java: ## @@ -41,52 +53,73 @@ * @see https://github.com/apache/pulsar/wiki/PIP-82%3A-Tenant-and-namespace-level-rate-limiting;>Global-quotas * */ -public class ResourceGroupConfigListener implements Consumer { +public class ResourceGroupConfigListener implements Consumer, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(ResourceGroupConfigListener.class); private final ResourceGroupService rgService; private final PulsarService pulsarService; private final ResourceGroupResources rgResources; -private final ResourceGroupNamespaceConfigListener rgNamespaceConfigListener; +private volatile ResourceGroupNamespaceConfigListener rgNamespaceConfigListener; +private final ScheduledExecutorService executorService; public ResourceGroupConfigListener(ResourceGroupService rgService, PulsarService pulsarService) { this.rgService = rgService; this.pulsarService = pulsarService; this.rgResources = pulsarService.getPulsarResources().getResourcegroupResources(); -loadAllResourceGroups(); this.rgResources.getStore().registerListener(this); -rgNamespaceConfigListener = new ResourceGroupNamespaceConfigListener( -rgService, pulsarService, this); +this.executorService = +Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-resource-group-config")); +execute(() -> loadAllResourceGroupsWithRetryAsync(0)); } -private void loadAllResourceGroups() { -rgResources.listResourceGroupsAsync().whenCompleteAsync((rgList, ex) -> { -if (ex != null) { -LOG.error("Exception when fetching resource groups", ex); -return; +@SneakyThrows +private void loadAllResourceGroupsWithRetryAsync(long retry) { +loadAllResourceGroupsAsync().exceptionally(e -> { +long nextRetry = retry + 1; +long delay = 500 * nextRetry; +LOG.error("Failed to load all resource groups during initialization, retrying after {}ms", delay); +schedule(() -> loadAllResourceGroupsWithRetryAsync(nextRetry), delay); +throw new RuntimeException(e); +}).thenAccept(__ -> { +if (rgNamespaceConfigListener == null) { +try{ Review Comment: try...catch is unnecessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Fix ResourceGroups loading [pulsar]
nodece commented on code in PR #21781: URL: https://github.com/apache/pulsar/pull/21781#discussion_r1534138906 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListener.java: ## @@ -41,52 +53,73 @@ * @see https://github.com/apache/pulsar/wiki/PIP-82%3A-Tenant-and-namespace-level-rate-limiting;>Global-quotas * */ -public class ResourceGroupConfigListener implements Consumer { +public class ResourceGroupConfigListener implements Consumer, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(ResourceGroupConfigListener.class); private final ResourceGroupService rgService; private final PulsarService pulsarService; private final ResourceGroupResources rgResources; -private final ResourceGroupNamespaceConfigListener rgNamespaceConfigListener; +private volatile ResourceGroupNamespaceConfigListener rgNamespaceConfigListener; +private final ScheduledExecutorService executorService; public ResourceGroupConfigListener(ResourceGroupService rgService, PulsarService pulsarService) { this.rgService = rgService; this.pulsarService = pulsarService; this.rgResources = pulsarService.getPulsarResources().getResourcegroupResources(); -loadAllResourceGroups(); this.rgResources.getStore().registerListener(this); -rgNamespaceConfigListener = new ResourceGroupNamespaceConfigListener( -rgService, pulsarService, this); +this.executorService = +Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-resource-group-config")); +execute(() -> loadAllResourceGroupsWithRetryAsync(0)); } -private void loadAllResourceGroups() { -rgResources.listResourceGroupsAsync().whenCompleteAsync((rgList, ex) -> { -if (ex != null) { -LOG.error("Exception when fetching resource groups", ex); -return; +@SneakyThrows +private void loadAllResourceGroupsWithRetryAsync(long retry) { +loadAllResourceGroupsAsync().exceptionally(e -> { +long nextRetry = retry + 1; +long delay = 500 * nextRetry; +LOG.error("Failed to load all resource groups during initialization, retrying after {}ms", delay); +schedule(() -> loadAllResourceGroupsWithRetryAsync(nextRetry), delay); +throw new RuntimeException(e); +}).thenAccept(__ -> { +if (rgNamespaceConfigListener == null) { +try{ +rgNamespaceConfigListener = new ResourceGroupNamespaceConfigListener(rgService, pulsarService, this); +}catch (Exception e) { +e.printStackTrace(); +} } +}); +} + +private CompletableFuture loadAllResourceGroupsAsync() { +return rgResources.listResourceGroupsAsync().thenComposeAsync(rgList -> { Review Comment: `executorService` only is used to retry the `loadAllResourceGroupsAsync()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Fix ResourceGroups loading [pulsar]
Technoboy- commented on code in PR #21781: URL: https://github.com/apache/pulsar/pull/21781#discussion_r1534024939 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListener.java: ## @@ -41,52 +53,73 @@ * @see https://github.com/apache/pulsar/wiki/PIP-82%3A-Tenant-and-namespace-level-rate-limiting;>Global-quotas * */ -public class ResourceGroupConfigListener implements Consumer { +public class ResourceGroupConfigListener implements Consumer, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(ResourceGroupConfigListener.class); private final ResourceGroupService rgService; private final PulsarService pulsarService; private final ResourceGroupResources rgResources; -private final ResourceGroupNamespaceConfigListener rgNamespaceConfigListener; +private volatile ResourceGroupNamespaceConfigListener rgNamespaceConfigListener; +private final ScheduledExecutorService executorService; public ResourceGroupConfigListener(ResourceGroupService rgService, PulsarService pulsarService) { this.rgService = rgService; this.pulsarService = pulsarService; this.rgResources = pulsarService.getPulsarResources().getResourcegroupResources(); -loadAllResourceGroups(); this.rgResources.getStore().registerListener(this); -rgNamespaceConfigListener = new ResourceGroupNamespaceConfigListener( -rgService, pulsarService, this); +this.executorService = +Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-resource-group-config")); +execute(() -> loadAllResourceGroupsWithRetryAsync(0)); } -private void loadAllResourceGroups() { -rgResources.listResourceGroupsAsync().whenCompleteAsync((rgList, ex) -> { -if (ex != null) { -LOG.error("Exception when fetching resource groups", ex); -return; +@SneakyThrows +private void loadAllResourceGroupsWithRetryAsync(long retry) { +loadAllResourceGroupsAsync().exceptionally(e -> { +long nextRetry = retry + 1; +long delay = 500 * nextRetry; +LOG.error("Failed to load all resource groups during initialization, retrying after {}ms", delay); +schedule(() -> loadAllResourceGroupsWithRetryAsync(nextRetry), delay); +throw new RuntimeException(e); +}).thenAccept(__ -> { +if (rgNamespaceConfigListener == null) { +try{ +rgNamespaceConfigListener = new ResourceGroupNamespaceConfigListener(rgService, pulsarService, this); +}catch (Exception e) { +e.printStackTrace(); +} } +}); +} + +private CompletableFuture loadAllResourceGroupsAsync() { +return rgResources.listResourceGroupsAsync().thenComposeAsync(rgList -> { Review Comment: we can use the executor for this async ## pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListener.java: ## @@ -41,52 +53,73 @@ * @see https://github.com/apache/pulsar/wiki/PIP-82%3A-Tenant-and-namespace-level-rate-limiting;>Global-quotas * */ -public class ResourceGroupConfigListener implements Consumer { +public class ResourceGroupConfigListener implements Consumer, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(ResourceGroupConfigListener.class); private final ResourceGroupService rgService; private final PulsarService pulsarService; private final ResourceGroupResources rgResources; -private final ResourceGroupNamespaceConfigListener rgNamespaceConfigListener; +private volatile ResourceGroupNamespaceConfigListener rgNamespaceConfigListener; +private final ScheduledExecutorService executorService; public ResourceGroupConfigListener(ResourceGroupService rgService, PulsarService pulsarService) { this.rgService = rgService; this.pulsarService = pulsarService; this.rgResources = pulsarService.getPulsarResources().getResourcegroupResources(); -loadAllResourceGroups(); this.rgResources.getStore().registerListener(this); -rgNamespaceConfigListener = new ResourceGroupNamespaceConfigListener( -rgService, pulsarService, this); +this.executorService = +Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-resource-group-config")); +execute(() -> loadAllResourceGroupsWithRetryAsync(0)); } -private void loadAllResourceGroups() { -rgResources.listResourceGroupsAsync().whenCompleteAsync((rgList, ex) -> { -if (ex != null) { -LOG.error("Exception when fetching resource groups", ex); -return; +@SneakyThrows +private void
(pulsar-client-go) branch dependabot/go_modules/google.golang.org/protobuf-1.33.0 deleted (was 9023d880)
This is an automated email from the ASF dual-hosted git repository. github-bot pushed a change to branch dependabot/go_modules/google.golang.org/protobuf-1.33.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git was 9023d880 chore(deps): bump google.golang.org/protobuf from 1.30.0 to 1.33.0 The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository.
(pulsar-client-go) branch master updated: chore(deps): bump google.golang.org/protobuf from 1.30.0 to 1.33.0 (#1198)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 393f80b4 chore(deps): bump google.golang.org/protobuf from 1.30.0 to 1.33.0 (#1198) 393f80b4 is described below commit 393f80b4b93faa36936380b643426026a2b2cd02 Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> AuthorDate: Thu Mar 21 22:24:44 2024 +0800 chore(deps): bump google.golang.org/protobuf from 1.30.0 to 1.33.0 (#1198) Bumps google.golang.org/protobuf from 1.30.0 to 1.33.0. --- updated-dependencies: - dependency-name: google.golang.org/protobuf dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 14 ++ 2 files changed, 3 insertions(+), 13 deletions(-) diff --git a/go.mod b/go.mod index 3257d510..51164871 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,7 @@ require ( golang.org/x/mod v0.8.0 golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602 golang.org/x/time v0.0.0-20191024005414-555d28b269f0 - google.golang.org/protobuf v1.30.0 + google.golang.org/protobuf v1.33.0 ) require ( diff --git a/go.sum b/go.sum index 50a1ba3e..0a768a0e 100644 --- a/go.sum +++ b/go.sum @@ -321,8 +321,6 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.5.1 h1:OJxoQ/rynoF0dcCdI7cLPktw/hR2cueqYfjm43oqK38= -golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -357,8 +355,6 @@ golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20220225172249-27dd8689420f h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc= -golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -420,13 +416,9 @@ golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210819135213-f52c844e1c1c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a h1:ppl5mZgokTT8uPkmYOyEUmPTr3ypaKkg5eFOGrAmxxE= -golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY= -golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -435,8 +427,6 @@ golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3 golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.7
Re: [PR] chore(deps): bump google.golang.org/protobuf from 1.30.0 to 1.33.0 [pulsar-client-go]
RobertIndie merged PR #1198: URL: https://github.com/apache/pulsar-client-go/pull/1198 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] Close dispatchers stuck due to mismatch between dispatcher.consumerList and dispatcher.consumerSet [pulsar]
poorbarcode commented on PR #22270: URL: https://github.com/apache/pulsar/pull/22270#issuecomment-2012412125 > It looks like we don't need this PR if we follow this https://github.com/apache/pulsar/pull/22283#pullrequestreview-1951994551 to fix the issue. I have tested on my laptop. Changed the implementation: Just add a defensive code to avoid the topic not being unloaded anymore: remove the consumers which are not mismatched between `consumerSet` and `consumerList`, and print an error log -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] fix mismatch between dispatcher.consumerList and dispatcher.consumerSet [pulsar]
poorbarcode commented on code in PR #22283: URL: https://github.com/apache/pulsar/pull/22283#discussion_r1533976409 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java: ## @@ -190,9 +190,15 @@ public synchronized CompletableFuture addConsumer(Consumer consumer) { } if (isConsumersExceededOnSubscription()) { -log.warn("[{}] Attempting to add consumer to subscription which reached max consumers limit", name); +log.warn("[{}] Attempting to add consumer to subscription which reached max consumers limit {}", +name, consumer); return FutureUtil.failedFuture(new ConsumerBusyException("Subscription reached max consumers limit")); } +if (consumerSet.contains(consumer)) { Review Comment: Used a new solution that prevents subsequent subscribing until the previous one is done, which guarantees that no race condition about twice subscribing. > I see the purpose of consumerSet is to make consumerFlow and removeConsumer more efficient now. So it won't be an issue. The new solution fixed it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] fix mismatch between dispatcher.consumerList and dispatcher.consumerSet [pulsar]
poorbarcode commented on PR #22283: URL: https://github.com/apache/pulsar/pull/22283#issuecomment-2012394478 @codelipenghui > We are using a failed future to ensure the new subscribe request from the same consumer should only be accepted once the old subscribe request is done. So I think the reasonable fix it to avoid the new subscribe request remove the failed future from the old subscribe request. > And we already have the mechanism to remove the failed future finally by the previous subscribe request. Good suggestion, already changed the solution: The subsequent subscribing will always fail until the previous one is done even if the previous subscribing was timed out in the client view, which guarantees that there is no race condition about twice subscribing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] fix mismatch between dispatcher.consumerList and dispatcher.consumerSet [pulsar]
poorbarcode commented on code in PR #22283: URL: https://github.com/apache/pulsar/pull/22283#discussion_r1533976409 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java: ## @@ -190,9 +190,15 @@ public synchronized CompletableFuture addConsumer(Consumer consumer) { } if (isConsumersExceededOnSubscription()) { -log.warn("[{}] Attempting to add consumer to subscription which reached max consumers limit", name); +log.warn("[{}] Attempting to add consumer to subscription which reached max consumers limit {}", +name, consumer); return FutureUtil.failedFuture(new ConsumerBusyException("Subscription reached max consumers limit")); } +if (consumerSet.contains(consumer)) { Review Comment: Used a new solution that prevents subsequent subscribing until the previous one is done, which guarantees that no race condition about twice subscribing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[I] Another Multithreading Issues [pulsar-dotpulsar]
gungod2000 opened a new issue, #210: URL: https://github.com/apache/pulsar-dotpulsar/issues/210 hello, I found anthoer issue,simple code under here. thanks private class ConsumerEntity { public IConsumer Consumer { get; set; } public string ThreadId { get; set; } } ConcurrentBag ConsumerPool = new ConcurrentBag(); private void Form2_Load(object sender, EventArgs e) { //just a demo //like this,it works normal,all messages will be received, speed is fast for (int i = 0; i < 10; i++) { Task.Factory.StartNew(() => { ReceiveMesage(Thread.CurrentThread.ManagedThreadId.ToString()); }); } //[issue here]*** //but like this ,receive message in task-child task,it works bad,receive message is very slowly,many consumer not receive message! for (int i = 0; i < 10; i++) { Task.Factory.StartNew(() => { for (int j = 0; j < 10; j++) { Task.Factory.StartNew(() => { //receive message is very slowly,many consumer not receive message!I don't understand why??? ReceiveMesage(Thread.CurrentThread.ManagedThreadId.ToString()); }); } }); } } private IMessage ReceiveMesage(string threadId) { //demo code ConsumerEntity consumerEntity = new ConsumerEntity(); //check consumerpool exist consumer by threadId //if not exist if (!ConsumerPool.Contains(consumerEntity)) { //create new consumer consumerEntity.ThreadId = threadId; consumerEntity.Consumer = create Consumer(); ConsumerPool.Add(consumerEntity); } CancellationToken cancellationToken = new CancellationToken(); //use return consumerEntity.Consumer.Receive(cancellationToken).GetAwaiter().GetResult(); } -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Multi-threaded consumption issues [pulsar-dotpulsar]
gungod2000 commented on issue #208: URL: https://github.com/apache/pulsar-dotpulsar/issues/208#issuecomment-2012021907 It will be happen IndexOutOfRangeException . thanks. sample code- namespace WindowsFormsApp1 { public partial class Form1 : Form { public Form1() { InitializeComponent(); } private static List> ConsumerPool = new List>(); private void Form1_Load(object sender, EventArgs e) { // add new consumer into pool ConsumerPool.Add(new consumer()...); } private void button1_Click(object sender, EventArgs e) { //start some tasks to receive message for (int i = 0; i < 10; i++) { Task.Factory.StartNew(() => { //if add lock in here ,it will be fine ,but too slowly. //get random consumer from pool var consumer = ConsumerPool.GetSingleConsumer(); //receive message var mesage = consumer.receive(); }); } } } } -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [improve][fn] Avoid getting stats for a "regex" topic [pulsar]
jiangpengcheng opened a new pull request, #22320: URL: https://github.com/apache/pulsar/pull/22320 Fixes #xyz Main Issue: #xyz PIP: #xyz ### Motivation While "cleanup" a function, it may try to get the stats for a "regex" topic which is useless, and the error logs also doesn't print the `existingConsumers` ### Modifications Do not get stats if topic is "regex" ### Verifying this change - [x] Make sure that the change passes the CI checks. - [x] This change is a trivial rework / code cleanup without any test coverage. ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: https://github.com/jiangpengcheng/pulsar/pull/29 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][cli] CmdConsume print publishTime And eventTime info. [pulsar]
aloyszhang merged PR #22308: URL: https://github.com/apache/pulsar/pull/22308 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [improve][cli] CmdConsume print publishTime And eventTime info. (#22308)
This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 74585b5ae07 [improve][cli] CmdConsume print publishTime And eventTime info. (#22308) 74585b5ae07 is described below commit 74585b5ae07a5ab10d85f9a8bc80e3093b7cff20 Author: atomchen <492672...@qq.com> AuthorDate: Thu Mar 21 18:29:53 2024 +0800 [improve][cli] CmdConsume print publishTime And eventTime info. (#22308) Co-authored-by: atomchchen --- .../main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java | 3 +++ .../org/apache/pulsar/tests/integration/cli/ClientToolTest.java| 7 --- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java index a7932c732eb..658b34767b5 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java @@ -108,6 +108,9 @@ public abstract class AbstractCmdConsume extends AbstractCmd { data = value.toString(); } +sb.append("publishTime:[").append(message.getPublishTime()).append("], "); +sb.append("eventTime:[").append(message.getEventTime()).append("], "); + String key = null; if (message.hasKey()) { key = message.getKey(); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClientToolTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClientToolTest.java index 571948443b1..0d6b6f1abe4 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClientToolTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClientToolTest.java @@ -86,13 +86,14 @@ public class ClientToolTest extends TopicMessagingBase { + "\nError output:\n" + result.getStderr()); } String output = result.getStdout(); -Pattern message = Pattern.compile("- got message -\nkey:\\[null\\], properties:\\[\\], content:(.*)"); +Pattern message = Pattern.compile( +"- got message -\npublishTime:\\[(.*)\\], eventTime:\\[(.*)\\], key:\\[null\\], " ++ "properties:\\[\\], content:(.*)"); Matcher matcher = message.matcher(output); List received = new ArrayList<>(MESSAGE_COUNT); while (matcher.find()) { -received.add(matcher.group(1)); +received.add(matcher.group(3)); } return received; } - }
Re: [PR] [fix] [broker] Part-1: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]
codelipenghui commented on code in PR #21946: URL: https://github.com/apache/pulsar/pull/21946#discussion_r1533581014 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java: ## @@ -188,58 +274,134 @@ protected CompletableFuture isLocalTopicActive() { }, brokerService.executor()); } -protected synchronized CompletableFuture closeProducerAsync() { -if (producer == null) { -STATE_UPDATER.set(this, State.Stopped); +/** + * @Deprecated This method only be used by {@link PersistentTopic#checkGC} now. + * TODO "PersistentReplicator.replicateEntries" may get a NullPointerException if this method and a + * "cursor.readComplete" execute concurrently. + */ +@Deprecated Review Comment: I think we can left a TODO there to refactor the code. ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java: ## @@ -188,58 +274,134 @@ protected CompletableFuture isLocalTopicActive() { }, brokerService.executor()); } -protected synchronized CompletableFuture closeProducerAsync() { -if (producer == null) { -STATE_UPDATER.set(this, State.Stopped); +/** + * @Deprecated This method only be used by {@link PersistentTopic#checkGC} now. + * TODO "PersistentReplicator.replicateEntries" may get a NullPointerException if this method and a + * "cursor.readComplete" execute concurrently. + */ +@Deprecated Review Comment: If it still be used in PersistentTopic#checkGC, I don't think we should deprecate this method. Do we have any other alternative for `PersistentTopic#checkGC`? ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java: ## @@ -64,10 +70,35 @@ public abstract class AbstractReplicator { protected static final AtomicReferenceFieldUpdater STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractReplicator.class, State.class, "state"); -private volatile State state = State.Stopped; - -protected enum State { -Stopped, Starting, Started, Stopping +@VisibleForTesting +@Getter +protected volatile State state = State.Stopped; + +public enum State { +/** + * This enum has two mean meanings:Init, Stopped. + * Regarding the meaning "Stopped", only {@link PersistentTopic#checkGC} will call {@link #disconnect}, + * so this method only be used by {@link PersistentTopic#checkGC} now. + * TODO After improving the method {@link #disconnect)}, we should rename "Stopped" to "Init". + */ +// The internal producer is stopped. +Stopped, +// Trying to create a new internal producer. +Starting, +// The internal producer has started, and tries copy data. +Started, +/** + * @Deprecated Only {@link PersistentTopic#checkGC} will call {@link #disconnect}, so this method only be + * used by {@link PersistentTopic#checkGC} now. + * TODO After improving the method {@link #disconnect)}, this enum should be removed. + */ +@Deprecated +// The internal producer is trying to stop. +Stopping, Review Comment: We still use `Stopping` when disconnecting the replicator. Why do we need add `@Deprecated` annotation? BTW, is it better to change `Stopping`, `Stopped` to `Disconnecting`, `Disconnected`? Honestly, It's not easy to understand what is the difference between stop, terminate. But disconnecting will be more better in this case. ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java: ## @@ -188,58 +274,134 @@ protected CompletableFuture isLocalTopicActive() { }, brokerService.executor()); } -protected synchronized CompletableFuture closeProducerAsync() { -if (producer == null) { -STATE_UPDATER.set(this, State.Stopped); +/** + * @Deprecated This method only be used by {@link PersistentTopic#checkGC} now. + * TODO "PersistentReplicator.replicateEntries" may get a NullPointerException if this method and a + * "cursor.readComplete" execute concurrently. + */ +@Deprecated +public CompletableFuture disconnect(boolean failIfHasBacklog, boolean closeTheStartingProducer) { +long backlog = getNumberOfEntriesInBacklog(); +if (failIfHasBacklog && backlog > 0) { +CompletableFuture disconnectFuture = new CompletableFuture<>(); +disconnectFuture.completeExceptionally(new TopicBusyException("Cannot close a replicator with backlog")); +if (log.isDebugEnabled()) { +log.debug("[{}] Replicator disconnect failed since topic has backlog", replicatorId); +} +return disconnectFuture; +} +log.info("[{}] Disconnect replicator at
(pulsar-site) branch main updated: Adding Pulsar Helm Chart 3.3.1 to index.yaml
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-site.git The following commit(s) were added to refs/heads/main by this push: new 416c9a8e7711 Adding Pulsar Helm Chart 3.3.1 to index.yaml 416c9a8e7711 is described below commit 416c9a8e771189870a4cf7eb6dd81429fb33b269 Author: Lari Hotari AuthorDate: Thu Mar 21 11:31:07 2024 +0200 Adding Pulsar Helm Chart 3.3.1 to index.yaml --- static/charts/index.yaml | 25 - 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/static/charts/index.yaml b/static/charts/index.yaml index 99fe0fd870e8..6c5b32b9b7d2 100644 --- a/static/charts/index.yaml +++ b/static/charts/index.yaml @@ -18,6 +18,29 @@ apiVersion: v1 entries: pulsar: + - apiVersion: v2 +appVersion: 3.0.3 +created: "2024-03-21T11:30:52.258959+02:00" +dependencies: +- condition: kube-prometheus-stack.enabled + name: kube-prometheus-stack + repository: https://prometheus-community.github.io/helm-charts + version: 56.x.x +description: Apache Pulsar Helm chart for Kubernetes +digest: 01e1822e10811352b2ed25e0b2d6a9d9a90af8efc5d32c82feb9301fab21fc83 +home: https://pulsar.apache.org +icon: https://pulsar.apache.org/img/pulsar.svg +kubeVersion: '>=1.21.0-0' +maintainers: +- email: d...@pulsar.apache.org + name: The Apache Pulsar Team +name: pulsar +sources: +- https://github.com/apache/pulsar +- https://github.com/apache/pulsar-helm-chart +urls: +- https://downloads.apache.org/pulsar/helm-chart/3.3.1/pulsar-3.3.1.tgz +version: 3.3.1 - apiVersion: v2 appVersion: 3.0.2 created: "2024-02-27T16:00:17.329495+02:00" @@ -523,4 +546,4 @@ entries: urls: - https://github.com/apache/pulsar-helm-chart/releases/download/pulsar-2.6.0-1/pulsar-2.6.0-1.tgz version: 2.6.0-1 -generated: "2024-02-27T16:00:17.312053+02:00" +generated: "2024-03-21T11:30:52.241885+02:00"
(pulsar) branch master updated: [fix][client]Fixed getting an incorrect `maxMessageSize` value when accessing multiple clusters in the same process (#22306)
This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 71598c11637 [fix][client]Fixed getting an incorrect `maxMessageSize` value when accessing multiple clusters in the same process (#22306) 71598c11637 is described below commit 71598c1163730defb9fdea85e813fe863c3fe4d2 Author: atomchen <492672...@qq.com> AuthorDate: Thu Mar 21 17:30:40 2024 +0800 [fix][client]Fixed getting an incorrect `maxMessageSize` value when accessing multiple clusters in the same process (#22306) Co-authored-by: atomchchen --- .../client/api/SimpleProducerConsumerTest.java | 6 ++-- .../client/impl/ProducerMemoryLimitTest.java | 12 .../pulsar/client/impl/ProducerSemaphoreTest.java | 18 ++-- .../client/impl/AbstractBatchMessageContainer.java | 9 -- .../client/impl/BatchMessageContainerImpl.java | 10 +++ .../org/apache/pulsar/client/impl/ClientCnx.java | 3 +- .../pulsar/client/impl/ConnectionHandler.java | 7 + .../apache/pulsar/client/impl/ConsumerImpl.java| 3 +- .../apache/pulsar/client/impl/ProducerImpl.java| 32 ++ 9 files changed, 60 insertions(+), 40 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 4536bda907b..4c106d39e7a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -93,13 +93,13 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.impl.ClientBuilderImpl; -import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.ConsumerBase; import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.PartitionedProducerImpl; +import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.TopicMessageImpl; import org.apache.pulsar.client.impl.TypedMessageBuilderImpl; import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; @@ -3906,11 +3906,11 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { .topic("persistent://my-property/my-ns/my-topic2"); @Cleanup -Producer producer = producerBuilder.create(); +ProducerImpl producer = (ProducerImpl)producerBuilder.create(); List> futures = new ArrayList<>(); // Asynchronously produce messages -byte[] message = new byte[ClientCnx.getMaxMessageSize() + 1]; +byte[] message = new byte[producer.getConnectionHandler().getMaxMessageSize() + 1]; for (int i = 0; i < maxPendingMessages + 10; i++) { Future future = producer.sendAsync(message); try { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java index d776fdb0ed9..55a67ae644d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java @@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import io.netty.buffer.ByteBufAllocator; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; @@ -33,7 +34,6 @@ import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SizeUnit; -import org.mockito.MockedStatic; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -69,10 +69,12 @@ public class ProducerMemoryLimitTest extends ProducerConsumerBase { .create(); this.stopBroker(); try { -try (MockedStatic mockedStatic = Mockito.mockStatic(ClientCnx.class)) { -mockedStatic.when(ClientCnx::getMaxMessageSize).thenReturn(8); -producer.send("memory-test".getBytes(StandardCharsets.UTF_8)); -} +ConnectionHandler connectionHandler =
Re: [PR] [fix][client]Fixed getting an incorrect `maxMessageSize` value when accessing multiple clusters in the same process [pulsar]
aloyszhang merged PR #22306: URL: https://github.com/apache/pulsar/pull/22306 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][cli] CmdConsume print publishTime And eventTime info. [pulsar]
chenhongSZ commented on PR #22308: URL: https://github.com/apache/pulsar/pull/22308#issuecomment-2011734030 /pulsarbot run-failure-checks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar-helm-chart) annotated tag pulsar-3.3.1 updated (43ed6f5 -> ac187ad)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a change to annotated tag pulsar-3.3.1 in repository https://gitbox.apache.org/repos/asf/pulsar-helm-chart.git *** WARNING: tag pulsar-3.3.1 was modified! *** from 43ed6f5 (commit) to ac187ad (tag) tagging 43ed6f543425059509b0e8593f85811ece179f75 (commit) replaces pulsar-3.3.0 by Lari Hotari on Thu Mar 21 11:28:07 2024 +0200 - Log - Apache Pulsar Helm Chart 3.3.1 -BEGIN PGP SIGNATURE- iQIzBAABCgAdFiEExmPS9k2iygnbKNmr0/pn1SLFUlYFAmX7/agACgkQ0/pn1SLF UlZ5YBAAtAjZRhTHtwhSj/IBl8ylyAMP03Ex3Cinvdg+WbwiItd0UNsKq0N3LW3b 58IXpq11kgxN4OkNLrhyFRo1zXZQE2IPkf/Gg0Wm/o216sDnG69lFG3OAxh2Fdf3 8gdIEedk20BiZKLNuqwZxmkjrlCiyeSWU4KctRkrlliISfxyWJfHKNRaFSWeVDma KPBWi1v0hedJiLveuuccd1yeOkFKsUnzBsDhd8RiGQS3qq2xI83skhrm1W3KAHHN 93sDmmm8fSqWUAdoPS450vw3GW1YqcuTixeBO4x8NhS+1M25WvmNGJRSoOWFN6V+ fjEowd6qvvi5CD7S6JmuV7tT9o7q30xx/FbYh2eA5FL1L4S+vqQyhf4L28ehD555 TZdJbRaZmLie2ISBtnNgkg0kcD8accKzmkWt6OSIq41ML4G777KSemhBqT4AjYb1 oi+apuGQy3o9+DD3NmYq+kLGoqInFqKwocAwCSVkOEC/y3NTcvo1Y2sWE6alv2Kz ltJbEA3Tf1nNOKAOGeZ1ec5HJ535DHfdhHbE9yACS38iJ3scW17edZYGZ8exh/ur J93C2BfcTuPrSRHK+TFX/FUtwx7hQY1Pf1dHY8SNg+CzgEpxBs1zpuq1PBPpC55q vI4RRTC1vIJ9Uxuvwtv2/C3pORI+oeiwt4SqgWMYd/x2qGrW8kw= =7gg2 -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
svn commit: r68036 - /dev/pulsar/helm-chart/3.3.1-candidate-1/ /release/pulsar/helm-chart/3.3.1/
Author: lhotari Date: Thu Mar 21 09:27:55 2024 New Revision: 68036 Log: Release Pulsar Helm Chart 3.3.1 from 3.3.1-candidate-1 Added: release/pulsar/helm-chart/3.3.1/ - copied from r68035, dev/pulsar/helm-chart/3.3.1-candidate-1/ Removed: dev/pulsar/helm-chart/3.3.1-candidate-1/
svn commit: r68035 - /dev/pulsar/helm-chart/3.3.1-candidate-1/index.yaml
Author: lhotari Date: Thu Mar 21 09:27:42 2024 New Revision: 68035 Log: Remove temporary index.yaml file Removed: dev/pulsar/helm-chart/3.3.1-candidate-1/index.yaml
[I] [Bug] Broker startup could get into a long crash loop when Functions are enabled [pulsar-helm-chart]
lhotari opened a new issue, #473: URL: https://github.com/apache/pulsar-helm-chart/issues/473 **Describe the bug** on pulsar-broker-0, this is the last entry from the logs ``` Caused by: java.net.UnknownHostException: pulsar-broker-2.pulsar-broker.default.svc.cluster.local: Name or service not known at java.net.Inet4AddressImpl.lookupAllHostAddr(Native Method) ~[?:?] at java.net.InetAddress$PlatformNameService.lookupAllHostAddr(InetAddress.java:934) ~[?:?] at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1543) ~[?:?] at java.net.InetAddress$NameServiceAddresses.get(InetAddress.java:852) ~[?:?] at java.net.InetAddress.getAllByName0(InetAddress.java:1533) ~[?:?] at java.net.InetAddress.getAllByName(InetAddress.java:1385) ~[?:?] at java.net.InetAddress.getAllByName(InetAddress.java:1306) ~[?:?] at io.netty.util.internal.SocketUtils$9.run(SocketUtils.java:169) ~[io.netty-netty-common-4.1.100.Final.jar:4.1.100.Final] at io.netty.util.internal.SocketUtils$9.run(SocketUtils.java:166) ~[io.netty-netty-common-4.1.100.Final.jar:4.1.100.Final] at java.security.AccessController.doPrivileged(AccessController.java:569) ~[?:?] at io.netty.util.internal.SocketUtils.allAddressesByName(SocketUtils.java:166) ~[io.netty-netty-common-4.1.100.Final.jar:4.1.100.Final] at io.netty.resolver.DefaultNameResolver.doResolveAll(DefaultNameResolver.java:50) ~[io.netty-netty-resolver-4.1.100.Final.jar:4.1.100.Final] at io.netty.resolver.SimpleNameResolver.resolveAll(SimpleNameResolver.java:79) ~[io.netty-netty-resolver-4.1.100.Final.jar:4.1.100.Final] at io.netty.resolver.SimpleNameResolver.resolveAll(SimpleNameResolver.java:71) ~[io.netty-netty-resolver-4.1.100.Final.jar:4.1.100.Final] at org.asynchttpclient.resolver.RequestHostnameResolver.resolve(RequestHostnameResolver.java:50) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?] at org.asynchttpclient.netty.request.NettyRequestSender.resolveAddresses(NettyRequestSender.java:357) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?] at org.asynchttpclient.netty.request.NettyRequestSender.sendRequestWithNewChannel(NettyRequestSender.java:300) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?] at org.asynchttpclient.netty.request.NettyRequestSender.sendRequestWithCertainForceConnect(NettyRequestSender.java:142) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?] at org.asynchttpclient.netty.request.NettyRequestSender.sendRequest(NettyRequestSender.java:113) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?] at org.asynchttpclient.netty.request.NettyRequestSender.sendNextRequest(NettyRequestSender.java:548) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?] at org.asynchttpclient.netty.handler.intercept.Redirect30xInterceptor.exitAfterHandlingRedirect(Redirect30xInterceptor.java:160) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?] at org.asynchttpclient.netty.handler.intercept.Interceptors.exitAfterIntercept(Interceptors.java:98) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?] at org.asynchttpclient.netty.handler.HttpHandler.handleHttpResponse(HttpHandler.java:78) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?] at org.asynchttpclient.netty.handler.HttpHandler.handleRead(HttpHandler.java:140) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?] at org.asynchttpclient.netty.handler.AsyncHttpClientHandler.channelRead(AsyncHttpClientHandler.java:78) ~[org.asynchttpclient-async-http-client-2.12.1.jar:?] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.100.Final.jar:4.1.100.Final] at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) ~[io.netty-netty-codec-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.100.Final.jar:4.1.100.Final] at
[PR] [improve][misc] Include native epoll library for Netty for arm64 [pulsar]
lhotari opened a new pull request, #22319: URL: https://github.com/apache/pulsar/pull/22319 ### Motivation When running Pulsar in Docker on MacOS Apple Silicon M3, I get this error message: ``` Caused by: java.io.FileNotFoundException: META-INF/native/libnetty_transport_native_epoll_aarch_64.so at io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:186) ~[io.netty-netty-common-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.epoll.Native.loadNativeLibrary(Native.java:323) ~[io.netty-netty-transport-classes-epoll-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.epoll.Native.(Native.java:85) ~[io.netty-netty-transport-classes-epoll-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.epoll.Epoll.(Epoll.java:40) ~[io.netty-netty-transport-classes-epoll-4.1.100.Final.jar:4.1.100.Final] at org.apache.pulsar.common.util.netty.EventLoopUtil.newEventLoopGroup(EventLoopUtil.java:59) ~[org.apache.pulsar-pulsar-common-3.0.3.jar:3.0.3] at org.apache.pulsar.broker.PulsarService.(PulsarService.java:337) ~[org.apache.pulsar-pulsar-broker-3.0.3.jar:3.0.3] at org.apache.pulsar.PulsarBrokerStarter$BrokerStarter.(PulsarBrokerStarter.java:204) ~[org.apache.pulsar-pulsar-broker-3.0.3.jar:3.0.3] at org.apache.pulsar.PulsarBrokerStarter.main(PulsarBrokerStarter.java:333) ~[org.apache.pulsar-pulsar-broker-3.0.3.jar:3.0.3] Suppressed: java.lang.UnsatisfiedLinkError: no netty_transport_native_epoll_aarch_64 in java.library.path: /usr/java/packages/lib:/usr/lib64:/lib64:/lib:/usr/lib at java.lang.ClassLoader.loadLibrary(ClassLoader.java:2434) ~[?:?] at java.lang.Runtime.loadLibrary0(Runtime.java:818) ~[?:?] at java.lang.System.loadLibrary(System.java:1993) ~[?:?] at io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38) ~[io.netty-netty-common-4.1.100.Final.jar:4.1.100.Final] at io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:396) ~[io.netty-netty-common-4.1.100.Final.jar:4.1.100.Final] at io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:161) ~[io.netty-netty-common-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.epoll.Native.loadNativeLibrary(Native.java:323) ~[io.netty-netty-transport-classes-epoll-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.epoll.Native.(Native.java:85) ~[io.netty-netty-transport-classes-epoll-4.1.100.Final.jar:4.1.100.Final] at io.netty.channel.epoll.Epoll.(Epoll.java:40) ~[io.netty-netty-transport-classes-epoll-4.1.100.Final.jar:4.1.100.Final] at org.apache.pulsar.common.util.netty.EventLoopUtil.newEventLoopGroup(EventLoopUtil.java:59) ~[org.apache.pulsar-pulsar-common-3.0.3.jar:3.0.3] at org.apache.pulsar.broker.PulsarService.(PulsarService.java:337) ~[org.apache.pulsar-pulsar-broker-3.0.3.jar:3.0.3] at org.apache.pulsar.PulsarBrokerStarter$BrokerStarter.(PulsarBrokerStarter.java:204) ~[org.apache.pulsar-pulsar-broker-3.0.3.jar:3.0.3] at org.apache.pulsar.PulsarBrokerStarter.main(PulsarBrokerStarter.java:333) ~[org.apache.pulsar-pulsar-broker-3.0.3.jar:3.0.3] Suppressed: java.lang.UnsatisfiedLinkError: no netty_transport_native_epoll_aarch_64 in java.library.path: /usr/java/packages/lib:/usr/lib64:/lib64:/lib:/usr/lib at java.lang.ClassLoader.loadLibrary(ClassLoader.java:2434) ~[?:?] at java.lang.Runtime.loadLibrary0(Runtime.java:818) ~[?:?] at java.lang.System.loadLibrary(System.java:1993) ~[?:?] at io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38) ~[io.netty-netty-common-4.1.100.Final.jar:4.1.100.Final] at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?] at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?] at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?] at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?] at io.netty.util.internal.NativeLibraryLoader$1.run(NativeLibraryLoader.java:430) ~[io.netty-netty-common-4.1.100.Final.jar:4.1.100.Final] at java.security.AccessController.doPrivileged(AccessController.java:318) ~[?:?] at io.netty.util.internal.NativeLibraryLoader.loadLibraryByHelper(NativeLibraryLoader.java:422) ~[io.netty-netty-common-4.1.100.Final.jar:4.1.100.Final] at io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:388) ~[io.netty-netty-common-4.1.100.Final.jar:4.1.100.Final] at io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:161) ~[io.netty-netty-common-4.1.100.Final.jar:4.1.100.Final]
Re: [PR] Add documents for the batching arguments when creating producer [pulsar-client-python]
merlimat merged PR #205: URL: https://github.com/apache/pulsar-client-python/pull/205 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar-client-python) branch main updated: Add documents for the batching arguments when creating producer (#205)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git The following commit(s) were added to refs/heads/main by this push: new 2a8819d Add documents for the batching arguments when creating producer (#205) 2a8819d is described below commit 2a8819def9a2b5eebdd6c2260ff3cbad6f0b1ef1 Author: Yunze Xu AuthorDate: Thu Mar 21 17:00:05 2024 +0800 Add documents for the batching arguments when creating producer (#205) --- pulsar/__init__.py | 14 ++ 1 file changed, 14 insertions(+) diff --git a/pulsar/__init__.py b/pulsar/__init__.py index a46c209..9590fa3 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -670,6 +670,20 @@ class Client: SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that release in order to be able to receive messages compressed with SNAPPY. +batching_enabled: bool, default=False +When automatic batching is enabled, multiple calls to `send` can result in a single batch to be sent to the +broker, leading to better throughput, especially when publishing small messages. +All messages in a batch will be published as a single batched message. The consumer will be delivered +individual messages in the batch in the same order they were enqueued. +batching_max_messages: int, default=1000 +When you set this option to a value greater than 1, messages are queued until this threshold or +`batching_max_allowed_size_in_bytes` is reached or batch interval has elapsed. +batching_max_allowed_size_in_bytes: int, default=128*1024 +When you set this option to a value greater than 1, messages are queued until this threshold or +`batching_max_messages` is reached or batch interval has elapsed. +batching_max_publish_delay_ms: int, default=10 +The batch interval in milliseconds. Queued messages will be sent in batch after this interval even if both +the threshold of `batching_max_messages` and `batching_max_allowed_size_in_bytes` are not reached. max_pending_messages: int, default=1000 Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. max_pending_messages_across_partitions: int, default=5
Re: [I] Document for the batching arguments when creating producer [pulsar-client-python]
merlimat closed issue #187: Document for the batching arguments when creating producer URL: https://github.com/apache/pulsar-client-python/issues/187 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add support for HasMessageAvailable [pulsar-dotpulsar]
blankensteiner commented on PR #207: URL: https://github.com/apache/pulsar-dotpulsar/pull/207#issuecomment-2011678421 Hi @smbecker and thank you for the PR :-) In regards to "bool HasReachedEndOfTopic();", this is a state and could be checked that way. In regards to "ValueTask HasMessageAvailable(CancellationToken cancellationToken = default);" I'm not sure why that is needed and why it is async? If you want to call "HasMessageAvailable" to make sure that Receive is not blocked, then that can not be guaranteed, because HasMessageAvailable might return true, but before you get to call Receive the Reader/Consumer can have lost the connection or faulted. Are that you really need something like this: bool TryReceive(out Message message)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] SendChannel.Completion causes the producer to become Faulted [pulsar-dotpulsar]
blankensteiner commented on issue #209: URL: https://github.com/apache/pulsar-dotpulsar/issues/209#issuecomment-2011663856 @kandersen82 could you have a look at this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Multi-threaded consumption issues [pulsar-dotpulsar]
blankensteiner commented on issue #208: URL: https://github.com/apache/pulsar-dotpulsar/issues/208#issuecomment-2011662348 Hi @gungod2000 Please fill out the bug report template, providing a full sample and stack trace. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add documents for the batching arguments when creating producer [pulsar-client-python]
BewareMyPower commented on PR #205: URL: https://github.com/apache/pulsar-client-python/pull/205#issuecomment-2011602289 https://github.com/apache/pulsar-client-python/assets/18204803/74560245-4f6d-4a03-ab29-cb782b151ac4;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] [pip] PIP-344 Correct the behavior of the public API pulsarClient.getPartitionsForTopic(topicName) [pulsar]
poorbarcode commented on code in PR #22182: URL: https://github.com/apache/pulsar/pull/22182#discussion_r1533389551 ## pip/pip-344.md: ## @@ -0,0 +1,115 @@ +# PIP-344: Correct the behavior of the public API pulsarClient.getPartitionsForTopic(topicName) + +# Background knowledge + +**Topic auto-creation** +- The partitioned topic auto-creation is dependent on `pulsarClient.getPartitionsForTopic` +- It triggers partitioned metadata creation by `pulsarClient.getPartitionsForTopic` +- And triggers the topic partition creation by producers' registration and consumers' registration. +- When calling `pulsarClient.getPartitionsForTopic(topicName)`, Pulsar will automatically create the partitioned topic metadata if it does not exist, either using `HttpLookupService` or `BinaryProtoLookupService`. + +**Now `pulsarClient.getPartitionsForTopic`'s behavior** +| case | broker allow `auto-create` | param allow `create if not exists` | non-partitioned topic | partitioned topic | current behavior | +| --- | --- | --- | --- | --- | --- | +| 1 | `true/false` | `true/false` | `exists: true` | | REST API: `partitions: 0` Binary API: `partitions: 0` | +| 2 | `true/false` | `true/false` | | `exists: true` `partitions: 3` | REST API: `partitions: 3` Binary API: `partitions: 3` | +| 3 | `true` | `true` | | | REST API: - `create new: true` - `partitions: 3` Binary API: - `create new: true` - `partitions: 3` | +| 4 | `true` | `false` | | | REST API: - `create new: false` - `partitions: 0` Binary API: not support | +| 5 | `false` | `true` | | | REST API: - `create new: false` - `partitions: 0` Binary API: - `create new: false` - `partitions: 0` | + +- Broker allows `auto-create`: see also the config `allowAutoTopicCreation` in `broker.conf`. +- Param allow `create if not exists` + - Regarding the HTTP API `PersistentTopics.getPartitionedMetadata`, it is an optional param which named `checkAllowAutoCreation,` and the default value is `false`. + - Regarding the `pulsar-admin` API, it depends on the HTTP API `PersistentTopics.getPartitionedMetadata`, and it always sets the param `checkAllowAutoCreation` to `false` and can not be set manually. + - Regarding the client API `HttpLookupService.getPartitionedTopicMetadata`, it depends on the HTTP API `PersistentTopics.getPartitionedMetadata`, and it always sets the param `checkAllowAutoCreation` to `true` and can not be set manually. + - Regarding the client API `BinaryProtoLookupService.getPartitionedTopicMetadata`, it always tries to create partitioned metadata. +- `REST API & HTTP API`: Since there are only two implementations of the 4 ways to get partitioned metadata, we call HTTP API `PersistentTopics.getPartitionedMetadata`, `pulsar-admin`, and `HttpLookupService.getPartitionedTopicMetadata` HTTP API, and call `BinaryProtoLookupService.getPartitionedTopicMetadata` Binary API. + +# Motivation + +The param `create if not exists` of the Binary API is always `true.` + +- For case 4 of `pulsarClient.getPartitionsForTopic`'s behavior, it always tries to create the partitioned metadata, but the API name is `getxxx`. +- For case 5 of `pulsarClient.getPartitionsForTopic`'s behavior, it returns a `0` partitioned metadata, but the topic does not exist. For the correct behavior of this case, we had discussed [here](https://github.com/apache/pulsar/issues/8813) before. +- BTW, [flink-connector-pulsar](https://github.com/apache/flink-connector-pulsar/blob/main/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java#L221-L227) is using this API to create partitioned topic metadata. + +# Goals + +- Regarding the case 4: Add a new API `PulsarClient.getPartitionsForTopic(String, boolean)` to support the feature that just get partitioned topic metadata and do not try to create one. See detail below. +- Regarding the case 5: Instead of returning a `0` partitioned metadata, respond to a not found error when calling `pulsarClient.getPartitionsForTopic(String)` if the topic does not exist. + +# Detailed Design + +## Public-facing Changes + +When you call the public API `pulsarClient.getPartitionsForTopic`, pulsar will not create the partitioned metadata anymore. + +### Public API +**LookupService.java** +```java + +- CompletableFuture getPartitionedTopicMetadata(TopicName topicName); + ++ // A new API that contains an additional param "createIfAutoCreationEnabled." ++ CompletableFuture getPartitionedTopicMetadata(TopicName topicName, boolean createIfAutoCreationEnabled); Review Comment: Sure, Improved the description for the new APIs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar-client-go) branch master updated: [fix] Change the wrong `SourceInstanceStatusData` in SinkInstanceStatus (#1199)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 3693b369 [fix] Change the wrong `SourceInstanceStatusData` in SinkInstanceStatus (#1199) 3693b369 is described below commit 3693b3695e2f072f9506b64b5e3000e5f107070d Author: jiangpengcheng AuthorDate: Thu Mar 21 15:39:08 2024 +0800 [fix] Change the wrong `SourceInstanceStatusData` in SinkInstanceStatus (#1199) --- pulsaradmin/pkg/utils/sink_status.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsaradmin/pkg/utils/sink_status.go b/pulsaradmin/pkg/utils/sink_status.go index 6cdb091f..a2651959 100644 --- a/pulsaradmin/pkg/utils/sink_status.go +++ b/pulsaradmin/pkg/utils/sink_status.go @@ -28,8 +28,8 @@ type SinkStatus struct { } type SinkInstanceStatus struct { - InstanceID int `json:"instanceId"` - Status SourceInstanceStatusData `json:"status"` + InstanceID int`json:"instanceId"` + Status SinkInstanceStatusData `json:"status"` } type SinkInstanceStatusData struct {
Re: [PR] [fix] Change the wrong `SourceInstanceStatusData` in `SinkInstanceStatus` [pulsar-client-go]
RobertIndie merged PR #1199: URL: https://github.com/apache/pulsar-client-go/pull/1199 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] [pip] PIP-344 Correct the behavior of the public API pulsarClient.getPartitionsForTopic(topicName) [pulsar]
RobertIndie commented on code in PR #22182: URL: https://github.com/apache/pulsar/pull/22182#discussion_r1533359134 ## pip/pip-344.md: ## @@ -0,0 +1,115 @@ +# PIP-344: Correct the behavior of the public API pulsarClient.getPartitionsForTopic(topicName) + +# Background knowledge + +**Topic auto-creation** +- The partitioned topic auto-creation is dependent on `pulsarClient.getPartitionsForTopic` +- It triggers partitioned metadata creation by `pulsarClient.getPartitionsForTopic` +- And triggers the topic partition creation by producers' registration and consumers' registration. +- When calling `pulsarClient.getPartitionsForTopic(topicName)`, Pulsar will automatically create the partitioned topic metadata if it does not exist, either using `HttpLookupService` or `BinaryProtoLookupService`. + +**Now `pulsarClient.getPartitionsForTopic`'s behavior** +| case | broker allow `auto-create` | param allow `create if not exists` | non-partitioned topic | partitioned topic | current behavior | +| --- | --- | --- | --- | --- | --- | +| 1 | `true/false` | `true/false` | `exists: true` | | REST API: `partitions: 0` Binary API: `partitions: 0` | +| 2 | `true/false` | `true/false` | | `exists: true` `partitions: 3` | REST API: `partitions: 3` Binary API: `partitions: 3` | +| 3 | `true` | `true` | | | REST API: - `create new: true` - `partitions: 3` Binary API: - `create new: true` - `partitions: 3` | +| 4 | `true` | `false` | | | REST API: - `create new: false` - `partitions: 0` Binary API: not support | +| 5 | `false` | `true` | | | REST API: - `create new: false` - `partitions: 0` Binary API: - `create new: false` - `partitions: 0` | + +- Broker allows `auto-create`: see also the config `allowAutoTopicCreation` in `broker.conf`. +- Param allow `create if not exists` + - Regarding the HTTP API `PersistentTopics.getPartitionedMetadata`, it is an optional param which named `checkAllowAutoCreation,` and the default value is `false`. + - Regarding the `pulsar-admin` API, it depends on the HTTP API `PersistentTopics.getPartitionedMetadata`, and it always sets the param `checkAllowAutoCreation` to `false` and can not be set manually. + - Regarding the client API `HttpLookupService.getPartitionedTopicMetadata`, it depends on the HTTP API `PersistentTopics.getPartitionedMetadata`, and it always sets the param `checkAllowAutoCreation` to `true` and can not be set manually. + - Regarding the client API `BinaryProtoLookupService.getPartitionedTopicMetadata`, it always tries to create partitioned metadata. +- `REST API & HTTP API`: Since there are only two implementations of the 4 ways to get partitioned metadata, we call HTTP API `PersistentTopics.getPartitionedMetadata`, `pulsar-admin`, and `HttpLookupService.getPartitionedTopicMetadata` HTTP API, and call `BinaryProtoLookupService.getPartitionedTopicMetadata` Binary API. + +# Motivation + +The param `create if not exists` of the Binary API is always `true.` + +- For case 4 of `pulsarClient.getPartitionsForTopic`'s behavior, it always tries to create the partitioned metadata, but the API name is `getxxx`. +- For case 5 of `pulsarClient.getPartitionsForTopic`'s behavior, it returns a `0` partitioned metadata, but the topic does not exist. For the correct behavior of this case, we had discussed [here](https://github.com/apache/pulsar/issues/8813) before. +- BTW, [flink-connector-pulsar](https://github.com/apache/flink-connector-pulsar/blob/main/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java#L221-L227) is using this API to create partitioned topic metadata. + +# Goals + +- Regarding the case 4: Add a new API `PulsarClient.getPartitionsForTopic(String, boolean)` to support the feature that just get partitioned topic metadata and do not try to create one. See detail below. +- Regarding the case 5: Instead of returning a `0` partitioned metadata, respond to a not found error when calling `pulsarClient.getPartitionsForTopic(String)` if the topic does not exist. + +# Detailed Design + +## Public-facing Changes + +When you call the public API `pulsarClient.getPartitionsForTopic`, pulsar will not create the partitioned metadata anymore. + +### Public API +**LookupService.java** +```java + +- CompletableFuture getPartitionedTopicMetadata(TopicName topicName); + ++ // A new API that contains an additional param "createIfAutoCreationEnabled." ++ CompletableFuture getPartitionedTopicMetadata(TopicName topicName, boolean createIfAutoCreationEnabled); Review Comment: I think we should document clearly which exceptions will be thrown exactly. The exception should also be part of the interface changes, so I think we need to explain it clearly in the proposal. IIUC, this method may throw two types of exceptions: not found and not supported. But which specific exception class is used? -- This is an automated message from the
(pulsar) branch master updated: [improve][admin] Fix the `createMissingPartitions` doesn't response correctly (#22311)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 5cabcacbfa8 [improve][admin] Fix the `createMissingPartitions` doesn't response correctly (#22311) 5cabcacbfa8 is described below commit 5cabcacbfa8874931d501cd040f7a8ac3d6d1923 Author: Jiwei Guo AuthorDate: Thu Mar 21 15:24:50 2024 +0800 [improve][admin] Fix the `createMissingPartitions` doesn't response correctly (#22311) --- .../org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java | 4 +++- .../java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java | 7 +++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 10cf5edd3c3..86993f749b5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -487,7 +487,7 @@ public class PersistentTopicsBase extends AdminResource { protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) { getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> { -if (metadata != null) { +if (metadata != null && metadata.partitions > 0) { CompletableFuture future = validateNamespaceOperationAsync(topicName.getNamespaceObject(), NamespaceOperation.CREATE_TOPIC); future.thenCompose(__ -> tryCreatePartitionsAsync(metadata.partitions)).thenAccept(v -> { @@ -497,6 +497,8 @@ public class PersistentTopicsBase extends AdminResource { resumeAsyncResponseExceptionally(asyncResponse, e); return null; }); +} else { +throw new RestException(Status.NOT_FOUND, String.format("Topic %s does not exist", topicName)); } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 23cb413614f..9a292175caa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -33,6 +33,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertSame; +import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; import java.lang.reflect.Field; import java.util.ArrayList; @@ -1779,4 +1780,10 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { assertTrue(namespaces.contains(ns1V2)); assertTrue(namespaces.contains(ns1V1)); } + +@Test +public void testCreateMissingPartitions() throws Exception { +String topicName = "persistent://" + testTenant + "/" + testNamespaceLocal + "/testCreateMissingPartitions"; +assertThrows(PulsarAdminException.NotFoundException.class, () -> admin.topics().createMissedPartitions(topicName)); +} }
Re: [PR] [improve][admin] Fix the `createMissingPartitions` doesn't response correctly [pulsar]
Technoboy- merged PR #22311: URL: https://github.com/apache/pulsar/pull/22311 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][client]Fixed getting an incorrect `maxMessageSize` value when accessing multiple clusters in the same process [pulsar]
chenhongSZ commented on PR #22306: URL: https://github.com/apache/pulsar/pull/22306#issuecomment-2011306009 /pulsarbot run-failure-checks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org