Re: [PR] [improve][client] Add maxConnectionsPerHost and connectionMaxIdleSeconds to PulsarAdminBuilder [pulsar]
lhotari commented on code in PR #22541: URL: https://github.com/apache/pulsar/pull/22541#discussion_r1574241817 ## pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java: ## @@ -336,4 +336,30 @@ PulsarAdminBuilder authentication(String authPluginClassName, Map + * By default, the connection pool maintains up to 16 connections to a single host. This method allows you to + * modify this default behavior and limit the number of connections. + * + * This setting can be useful in scenarios where you want to limit the resources used by the client library, + * or control the level of parallelism for operations so that a single client does not overwhelm + * the Pulsar cluster with too many concurrent connections. + * + * @param connectionsPerHost the maximum number of connections to establish per host. Set to <= 0 to disable + * the limit. + * @return the PulsarAdminBuilder instance, allowing for method chaining + */ +PulsarAdminBuilder connectionsPerHost(int connectionsPerHost); Review Comment: makes sense. renamed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] The partitioned topic startMessageId is illegal [pulsar]
RogerHuangZB commented on issue #9784: URL: https://github.com/apache/pulsar/issues/9784#issuecomment-2068707786 I want to know if this will be resolved? Or not considering this way to reader a partitioned topic? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Create new ledger after the current ledger is closed [pulsar]
lhotari commented on PR #22034: URL: https://github.com/apache/pulsar/pull/22034#issuecomment-2068749934 In branch-3.0 this change is making ClusterMigrationTest fail consistently with NPE. example: https://github.com/apache/pulsar/actions/runs/8778991470/job/24086503436#step:10:1557 ``` Error: org.apache.pulsar.broker.service.ClusterMigrationTest.testClusterMigration[true, Shared](4) Time elapsed: 87.394 s <<< FAILURE! org.apache.pulsar.client.api.PulsarClientException$BrokerPersistenceException: {"errorMsg":"org.apache.bookkeeper.mledger.ManagedLedgerException$MetaStoreException: java.util.concurrent.CompletionException: java.lang.NullPointerException: Cannot invoke "org.apache.bookkeeper.client.LedgerHandle.getId()" because "this.currentLedger" is null","reqId":1758475176444131959, "remote":"localhost/127.0.0.1:43465", "local":"/127.0.0.1:35162"} at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1083) at org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:89) at org.apache.pulsar.broker.service.ClusterMigrationTest.testClusterMigration(ClusterMigrationTest.java:352) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139) at org.testng.internal.invokers.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:47) at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:76) at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Support lookup options for extensible load manager [pulsar]
BewareMyPower commented on code in PR #22487: URL: https://github.com/apache/pulsar/pull/22487#discussion_r1574488674 ## tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java: ## @@ -405,7 +405,7 @@ public void testIsolationPolicy() throws Exception { admin.lookups().lookupTopicAsync(topic).get(5, TimeUnit.SECONDS); Review Comment: Should you add `fail()` here? ## tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java: ## @@ -405,7 +405,7 @@ public void testIsolationPolicy() throws Exception { admin.lookups().lookupTopicAsync(topic).get(5, TimeUnit.SECONDS); } catch (Exception ex) { log.error("Failed to lookup topic: ", ex); Review Comment: It seems that the lookup error could be `TimeoutException` before retrying until the error becomes "Service Unavailable". Is it expected? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Support lookup options for extensible load manager [pulsar]
Demogorgon314 commented on code in PR #22487: URL: https://github.com/apache/pulsar/pull/22487#discussion_r1574550415 ## tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java: ## @@ -405,7 +405,7 @@ public void testIsolationPolicy() throws Exception { admin.lookups().lookupTopicAsync(topic).get(5, TimeUnit.SECONDS); } catch (Exception ex) { log.error("Failed to lookup topic: ", ex); Review Comment: This PR should not cause the `TimeoutException`, I think we should merge this PR first, then I will try to find the root cause and push another PR to fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] upgrade jclouds 2.5.0 -> 2.6.0 [pulsar]
nikhil-ctds commented on PR #0: URL: https://github.com/apache/pulsar/pull/0#issuecomment-2069085315 @lhotari With the above fix I tried testing GCS offloader with ledgers of size >= 2.5gb. I'm still facing the same error ```Error in offload null Reason: Error offloading: org.apache.bookkeeper.mledger.ManagedLedgerException: java.util.concurrent.CompletionException: org.jclouds.http.HttpResponseException: command: POST https://www.googleapis.com/storage/v1/b/bucket-cognitree-ls1258/o/d7f69abd-dc45-4fc1-bbe3-bd9daaa8fea7-ledger-9/compose HTTP/1.1 failed with response: HTTP/1.1 400 Bad Request; content: [{ "error": { "code": 400, "message": "The number of source components provided (38) exceeds the maximum (32)", "errors": [ { "message": "The number of source components provided (38) exceeds the maximum (32)", "domain": "global", "reason": "invalid" } ] } } ] ``` I have also written a simple code using Jclouds 2.6.0 to upload a large file of size 3gb to GCS bucket, it has uploaded successfully. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Support lookup options for extensible load manager [pulsar]
Demogorgon314 commented on code in PR #22487: URL: https://github.com/apache/pulsar/pull/22487#discussion_r1574549415 ## tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java: ## @@ -405,7 +405,7 @@ public void testIsolationPolicy() throws Exception { admin.lookups().lookupTopicAsync(topic).get(5, TimeUnit.SECONDS); Review Comment: Yes, updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][test] Add topic policy test for topic API [pulsar]
Technoboy- merged PR #22546: URL: https://github.com/apache/pulsar/pull/22546 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [improve][test] Add topic policy test for topic API (#22546)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 3a0f908e80d [improve][test] Add topic policy test for topic API (#22546) 3a0f908e80d is described below commit 3a0f908e80d0863920a1258362fd782e95fe8f17 Author: Jiwei Guo AuthorDate: Mon Apr 22 19:47:03 2024 +0800 [improve][test] Add topic policy test for topic API (#22546) --- .../org/apache/pulsar/broker/admin/AuthZTest.java | 113 ++ .../apache/pulsar/broker/admin/TopicAuthZTest.java | 1121 ++-- .../admin/TransactionAndSchemaAuthZTest.java | 359 +++ 3 files changed, 1270 insertions(+), 323 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AuthZTest.java new file mode 100644 index 000..a710a03970d --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AuthZTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import io.jsonwebtoken.Jwts; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.pulsar.broker.authorization.AuthorizationService; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.common.policies.data.NamespaceOperation; +import org.apache.pulsar.common.policies.data.TopicOperation; +import org.apache.pulsar.security.MockedPulsarStandalone; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import static org.mockito.Mockito.doReturn; + +public class AuthZTest extends MockedPulsarStandalone { + +protected PulsarAdmin superUserAdmin; + +protected PulsarAdmin tenantManagerAdmin; + +protected AuthorizationService authorizationService; + +protected AuthorizationService orignalAuthorizationService; + +protected static final String TENANT_ADMIN_SUBJECT = UUID.randomUUID().toString(); +protected static final String TENANT_ADMIN_TOKEN = Jwts.builder() +.claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact(); + +@BeforeMethod(alwaysRun = true) +public void before() throws IllegalAccessException { +orignalAuthorizationService = getPulsarService().getBrokerService().getAuthorizationService(); +authorizationService = Mockito.spy(orignalAuthorizationService); +FieldUtils.writeField(getPulsarService().getBrokerService(), "authorizationService", +authorizationService, true); +} + +@AfterMethod(alwaysRun = true) +public void after() throws IllegalAccessException { +FieldUtils.writeField(getPulsarService().getBrokerService(), "authorizationService", +orignalAuthorizationService, true); +} + +protected AtomicBoolean setAuthorizationTopicOperationChecker(String role, Object operation) { +AtomicBoolean execFlag = new AtomicBoolean(false); +if (operation instanceof TopicOperation) { +Mockito.doAnswer(invocationOnMock -> { +String role_ = invocationOnMock.getArgument(2); +if (role.equals(role_)) { +TopicOperation operation_ = invocationOnMock.getArgument(1); +Assert.assertEquals(operation_, operation); +} +execFlag.set(true); +return invocationOnMock.callRealMethod(); + }).when(authorizationService).allowTopicOperationAsync(Mockito.any(), Mockito.any(), Mockito.any(), +Mockito.any(), Mockito.any()); +} else if (operation instanceof NamespaceOperation) { +doReturn(true) + .when(authorizationService).isValidOriginalPrincipal(Mockito.any(), Mockito.any(), Mockito.any()); +Mockito.doAnswer(invocationOnMock -> { +String role_ = invocationOnMock.getArgument(2); +
[PR] [fix][admin] Fix can't delete tenant for v1 [pulsar]
Technoboy- opened a new pull request, #22550: URL: https://github.com/apache/pulsar/pull/22550 ### Motivation Can't delete tenant for v1. Find this bug when fixing #22547. ``` Caused by: org.apache.pulsar.metadata.api.MetadataStoreException$ContentDeserializationException: Failed to deserialize payload for key '/admin/policies/p1/c1' at org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl.lambda$readValueFromStore$0(MetadataCacheImpl.java:115) at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1150) ... 12 more Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input at [Source: (byte[])""; line: 1, column: 0] at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59) at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1746) at com.fasterxml.jackson.databind.ObjectReader._initForReading(ObjectReader.java:360) at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:2095) at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1583) at org.apache.pulsar.metadata.cache.impl.JSONMetadataSerdeSimpleType.deserialize(JSONMetadataSerdeSimpleType.java:46) at org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl.lambda$readValueFromStore$0(MetadataCacheImpl.java:111) ... 13 more 2024-04-22T20:24:34,580 - INFO - [configuration-metadata-store-4-1:ResourceGroupNamespaceConfigListener] - Metadata store notification: Path /admin/policies/p1/c1, Type Deleted 2024-04-22T20:24:34,594 - ERROR - [metadata-store-2-1:TenantsBase] - [pass.pass] Failed to delete tenant p1 org.apache.pulsar.metadata.api.MetadataStoreException: org.apache.zookeeper.KeeperException$NotEmptyException: KeeperErrorCode = Directory not empty for /managed-ledgers/p1 at org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:476) ~[classes/:?] at org.apache.pulsar.metadata.impl.ZKMetadataStore.handleDeleteResult(ZKMetadataStore.java:304) ~[classes/:?] at org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$batchOperation$5(ZKMetadataStore.java:216) ~[classes/:?] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) [?:?] at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) [?:?] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) [?:?] at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.108.Final.jar:4.1.108.Final] at java.base/java.lang.Thread.run(Thread.java:833) [?:?] Caused by: org.apache.zookeeper.KeeperException$NotEmptyException: KeeperErrorCode = Directory not empty for /managed-ledgers/p1 ``` ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] upgrade jclouds 2.5.0 -> 2.6.0 [pulsar]
lhotari commented on PR #0: URL: https://github.com/apache/pulsar/pull/0#issuecomment-2069324241 > @lhotari With the above fix I tried testing GCS offloader with ledgers of size >= 2.5gb. I'm still facing the same error > > ``` > Error in offload > null > > Reason: Error offloading: org.apache.bookkeeper.mledger.ManagedLedgerException: java.util.concurrent.CompletionException: org.jclouds.http.HttpResponseException: command: POST https://www.googleapis.com/storage/v1/b/bucket-cognitree-ls1258/o/d7f69abd-dc45-4fc1-bbe3-bd9daaa8fea7-ledger-9/compose HTTP/1.1 failed with response: HTTP/1.1 400 Bad Request; content: [{ > "error": { > "code": 400, > "message": "The number of source components provided (38) exceeds the maximum (32)", > "errors": [ > { > "message": "The number of source components provided (38) exceeds the maximum (32)", > "domain": "global", > "reason": "invalid" > } > ] > } > } > ] > ``` > > I have also written a simple code using Jclouds 2.6.0 to upload a large file of size 3gb to GCS bucket, it has uploaded successfully. @nikhil-ctds Do you have a chance to rerun the test with `gcsManagedLedgerOffloadMaxBlockSizeInBytes=134217728` ? The default value `gcsManagedLedgerOffloadMaxBlockSizeInBytes=67108864` will limit the max upload size to 2048MiB (64MiB * 32). Doubling gcsManagedLedgerOffloadMaxBlockSizeInBytes to `134217728` should lift the limit to 4096MiB. Can you please verify? Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries when recovering a terminated managed ledger [pulsar]
coderzc opened a new pull request, #22552: URL: https://github.com/apache/pulsar/pull/22552 ### Motivation Since managedLedger does not create a new ledger when recovering a terminated managed ledger, lead to getValidPositionAfterSkippedEntries will get an NPE. ``` org.apache.bookkeeper.mledger.ManagedLedgerException$MetaStoreException: java.util.concurrent.CompletionException: java.lang.NullPointerException: Cannot invoke "org.apache.bookkeeper.client.LedgerHandle.getId()" because "this.currentLedger" is null Caused by: java.util.concurrent.CompletionException: java.lang.NullPointerException: Cannot invoke "org.apache.bookkeeper.client.LedgerHandle.getId()" because "this.currentLedger" is null at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315) ~[?:?] at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320) ~[?:?] at java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:722) ~[?:?] at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) ~[?:?] at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:201) [bookkeeper-common-4.16.4.jar:4.16.4] at org.apache.bookkeeper.common.util.SingleThreadSafeScheduledExecutorService$SafeRunnable.run(SingleThreadSafeScheduledExecutorService.java:46) [bookkeeper-common-4.16.4.jar:4.16.4] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) [?:?] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?] at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.108.Final.jar:4.1.108.Final] at java.base/java.lang.Thread.run(Thread.java:833) [?:?] Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.bookkeeper.client.LedgerHandle.getId()" because "this.currentLedger" is null ``` https://github.com/apache/pulsar/blob/3a0f908e80d0863920a1258362fd782e95fe8f17/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L486-L492 https://github.com/apache/pulsar/blob/3a0f908e80d0863920a1258362fd782e95fe8f17/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L3698-L3712 ### Modifications If `currentLedger == null`, then return `lastConfirmedEntry.getNext()` for getValidPositionAfterSkippedEntries to avoid NPE. ### Verifying this change - [ ] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]
poorbarcode commented on code in PR #22537: URL: https://github.com/apache/pulsar/pull/22537#discussion_r1574733176 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java: ## @@ -621,35 +625,82 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n private void internalCreatePartitionedTopicToReplicatedClustersInBackground(int numPartitions) { getNamespaceReplicatedClustersAsync(namespaceName) -.thenAccept(clusters -> { -for (String cluster : clusters) { -if (!cluster.equals(pulsar().getConfiguration().getClusterName())) { -// this call happens in the background without async composition. completion is logged. -pulsar().getPulsarResources().getClusterResources() -.getClusterAsync(cluster) -.thenCompose(clusterDataOp -> -((TopicsImpl) pulsar().getBrokerService() - .getClusterPulsarAdmin(cluster, - clusterDataOp).topics()) - .createPartitionedTopicAsync( - topicName.getPartitionedTopicName(), -numPartitions, -true, null)) -.whenComplete((__, ex) -> { -if (ex != null) { -log.error( -"[{}] Failed to create partitioned topic {} in cluster {}.", -clientAppId(), topicName, cluster, ex); -} else { -log.info( -"[{}] Successfully created partitioned topic {} in " -+ "cluster {}", -clientAppId(), topicName, cluster); -} -}); -} +.thenAccept(clusters -> { +// this call happens in the background without async composition. completion is logged. + internalCreatePartitionedTopicToReplicatedClustersInBackground(clusters, numPartitions); +}); +} + +protected Map> internalCreatePartitionedTopicToReplicatedClustersInBackground( +Set clusters, int numPartitions) { +final String shortTopicName = topicName.getPartitionedTopicName(); +Map> tasksForAllClusters = new HashMap<>(); +for (String cluster : clusters) { +if (cluster.equals(pulsar().getConfiguration().getClusterName())) { +continue; +} +ClusterResources clusterResources = pulsar().getPulsarResources().getClusterResources(); +CompletableFuture createRemoteTopicFuture = new CompletableFuture<>(); +tasksForAllClusters.put(cluster, createRemoteTopicFuture); + clusterResources.getClusterAsync(cluster).whenComplete((clusterData, ex1) -> { +if (ex1 != null) { +// Unexpected error, such as NPE. Catch all error to avoid the "createRemoteTopicFuture" stuck. +log.error("[{}] An un-expected error occurs when trying to create partitioned topic {} in cluster" ++ " {}.", clientAppId(), topicName, cluster, ex1); +createRemoteTopicFuture.completeExceptionally(new RestException(ex1)); +return; +} +// Get cluster data success. +TopicsImpl topics = +(TopicsImpl) pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterData).topics(); +topics.createPartitionedTopicAsync(shortTopicName, numPartitions, true, null) +.whenComplete((ignore, ex2) -> { +if (ex2 == null) { +// Create success. +log.info("[{}] Successfully created partitioned topic {} in cluster {}", +clientAppId(), topicName, cluster); +createRemoteTopicFuture.complete(null); +return; +} +// Create topic on the remote cluster error. +Throwable unwrapEx2 = FutureUtil.unwrapCompletionException(ex2); +// The topic has been created before, che
Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]
poorbarcode commented on code in PR #22537: URL: https://github.com/apache/pulsar/pull/22537#discussion_r1574733176 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java: ## @@ -621,35 +625,82 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n private void internalCreatePartitionedTopicToReplicatedClustersInBackground(int numPartitions) { getNamespaceReplicatedClustersAsync(namespaceName) -.thenAccept(clusters -> { -for (String cluster : clusters) { -if (!cluster.equals(pulsar().getConfiguration().getClusterName())) { -// this call happens in the background without async composition. completion is logged. -pulsar().getPulsarResources().getClusterResources() -.getClusterAsync(cluster) -.thenCompose(clusterDataOp -> -((TopicsImpl) pulsar().getBrokerService() - .getClusterPulsarAdmin(cluster, - clusterDataOp).topics()) - .createPartitionedTopicAsync( - topicName.getPartitionedTopicName(), -numPartitions, -true, null)) -.whenComplete((__, ex) -> { -if (ex != null) { -log.error( -"[{}] Failed to create partitioned topic {} in cluster {}.", -clientAppId(), topicName, cluster, ex); -} else { -log.info( -"[{}] Successfully created partitioned topic {} in " -+ "cluster {}", -clientAppId(), topicName, cluster); -} -}); -} +.thenAccept(clusters -> { +// this call happens in the background without async composition. completion is logged. + internalCreatePartitionedTopicToReplicatedClustersInBackground(clusters, numPartitions); +}); +} + +protected Map> internalCreatePartitionedTopicToReplicatedClustersInBackground( +Set clusters, int numPartitions) { +final String shortTopicName = topicName.getPartitionedTopicName(); +Map> tasksForAllClusters = new HashMap<>(); +for (String cluster : clusters) { +if (cluster.equals(pulsar().getConfiguration().getClusterName())) { +continue; +} +ClusterResources clusterResources = pulsar().getPulsarResources().getClusterResources(); +CompletableFuture createRemoteTopicFuture = new CompletableFuture<>(); +tasksForAllClusters.put(cluster, createRemoteTopicFuture); + clusterResources.getClusterAsync(cluster).whenComplete((clusterData, ex1) -> { +if (ex1 != null) { +// Unexpected error, such as NPE. Catch all error to avoid the "createRemoteTopicFuture" stuck. +log.error("[{}] An un-expected error occurs when trying to create partitioned topic {} in cluster" ++ " {}.", clientAppId(), topicName, cluster, ex1); +createRemoteTopicFuture.completeExceptionally(new RestException(ex1)); +return; +} +// Get cluster data success. +TopicsImpl topics = +(TopicsImpl) pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterData).topics(); +topics.createPartitionedTopicAsync(shortTopicName, numPartitions, true, null) +.whenComplete((ignore, ex2) -> { +if (ex2 == null) { +// Create success. +log.info("[{}] Successfully created partitioned topic {} in cluster {}", +clientAppId(), topicName, cluster); +createRemoteTopicFuture.complete(null); +return; +} +// Create topic on the remote cluster error. +Throwable unwrapEx2 = FutureUtil.unwrapCompletionException(ex2); +// The topic has been created before, che
Re: [PR] [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries when recovering a terminated managed ledger [pulsar]
shibd commented on code in PR #22552: URL: https://github.com/apache/pulsar/pull/22552#discussion_r1574746976 ## managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java: ## @@ -4695,5 +4695,58 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { && cursorReadPosition.getEntryId() == expectReadPosition.getEntryId()); } +@Test +public void testRecoverCursorWithTerminateManagedLedger() throws Exception { +String mlName = "my_test_ledger"; +String cursorName = "c1"; + +ManagedLedgerConfig config = new ManagedLedgerConfig(); +ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(mlName, config); +ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(cursorName); + +// Write some data. +Position p0 = ledger.addEntry("entry-0".getBytes()); +Position p1 = ledger.addEntry("entry-1".getBytes()); + +// Read and ack message. +List entries = c1.readEntries(2); +assertEquals(entries.size(), 2); +assertEquals(entries.get(0).getPosition(), p0); +assertEquals(entries.get(1).getPosition(), p1); +entries.forEach(Entry::release); + +// Mark delete the last message. +c1.markDelete(p1); +Position markDeletedPosition = c1.getMarkDeletedPosition(); +Assert.assertEquals(markDeletedPosition, p1); + +// Terminate the managed ledger. +Position lastPosition = ledger.terminate(); +assertEquals(lastPosition, p1); + +// Close the ledger. +ledger.close(); + +// Reopen the ledger. +ledger = (ManagedLedgerImpl) factory.open(mlName, config); +BookKeeper mockBookKeeper = mock(BookKeeper.class); +final ManagedCursorImpl cursor = new ManagedCursorImpl(mockBookKeeper, new ManagedLedgerConfig(), ledger, +cursorName); + +// Recover the cursor. +cursor.recover(new VoidCallback() { Review Comment: Need to convert to a sync call. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [improve][broker] Make BrokerSelectionStrategy pluggable [pulsar]
BewareMyPower opened a new pull request, #22553: URL: https://github.com/apache/pulsar/pull/22553 ### Motivation When users want to extend the `ExtensibleLoadManagerImpl`, the `BrokerSelectionStrategy` cannot be customized, so users have to write much duplicated code for a customized broker selection strategy. ### Modifications Add a stable interface `BrokerSelectionStrategyFactory` and implements this interface in `ExtensibleLoadManagerImpl` with the default implementation that returns a `LeastResourceUsageWithWeight` instance by default. Add `CustomBrokerSelectionStrategyTest` to show how to customize the broker selection strategy and verify it works. ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [x] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries when recovering a terminated managed ledger [pulsar]
shibd commented on code in PR #22552: URL: https://github.com/apache/pulsar/pull/22552#discussion_r1574746976 ## managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java: ## @@ -4695,5 +4695,58 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { && cursorReadPosition.getEntryId() == expectReadPosition.getEntryId()); } +@Test +public void testRecoverCursorWithTerminateManagedLedger() throws Exception { +String mlName = "my_test_ledger"; +String cursorName = "c1"; + +ManagedLedgerConfig config = new ManagedLedgerConfig(); +ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(mlName, config); +ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(cursorName); + +// Write some data. +Position p0 = ledger.addEntry("entry-0".getBytes()); +Position p1 = ledger.addEntry("entry-1".getBytes()); + +// Read and ack message. +List entries = c1.readEntries(2); +assertEquals(entries.size(), 2); +assertEquals(entries.get(0).getPosition(), p0); +assertEquals(entries.get(1).getPosition(), p1); +entries.forEach(Entry::release); + +// Mark delete the last message. +c1.markDelete(p1); +Position markDeletedPosition = c1.getMarkDeletedPosition(); +Assert.assertEquals(markDeletedPosition, p1); + +// Terminate the managed ledger. +Position lastPosition = ledger.terminate(); +assertEquals(lastPosition, p1); + +// Close the ledger. +ledger.close(); + +// Reopen the ledger. +ledger = (ManagedLedgerImpl) factory.open(mlName, config); +BookKeeper mockBookKeeper = mock(BookKeeper.class); +final ManagedCursorImpl cursor = new ManagedCursorImpl(mockBookKeeper, new ManagedLedgerConfig(), ledger, +cursorName); + +// Recover the cursor. +cursor.recover(new VoidCallback() { Review Comment: Need to convert to a sync call and to assert recover success. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Offloading large ledgers (>2GB) fail with Google Cloud Storage [pulsar]
lhotari commented on issue #15159: URL: https://github.com/apache/pulsar/issues/15159#issuecomment-2069410440 #0 alone doesn't fix the issue. Created #22554 to fix the remaining issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[I] Offloading large ledgers (>2GB) fail with Google Cloud Storage [pulsar]
lhotari opened a new issue, #15159: URL: https://github.com/apache/pulsar/issues/15159 **Describe the bug** Offloading large ledgers fail with Google Cloud Storage and the default settings. With default settings offloading ledgers over 2GB will fail to GCS since there's a limitation in JClouds (reported as [[JCLOUDS-1606] Cannot upload more than 32 parts to GCS](https://issues.apache.org/jira/browse/JCLOUDS-1606)) which limits a multipart upload to 32 parts. GCS supports multipart uploads up to 1 parts, but JClouds doesn't use the API in a way to achieve more than 32 parts. Here's an example log entry of the problem: ``` java.util.concurrent.CompletionException: org.jclouds.http.HttpResponseException: command: POST https://www.googleapis.com/storage/v1/b/somebucket/o/ff553922-1fa3-4ceb-abcd-60106603b5c8-object-123456/compose HTTP/1.1 failed with response: HTTP/1.1 400 Bad Request; content: [{ "error": { "code": 400, "message": "The number of source components provided (35) exceeds the maximum (32)", "errors": [ { "message": "The number of source components provided (35) exceeds the maximum (32)", "domain": "global", "reason": "invalid" } ] } } ] at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:367) ~[?:?] at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:376) ~[?:?] at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:1019) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?] at org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader.lambda$offload$0(BlobStoreManagedLedgerOffloader.java:237) ~[?:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?] at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131) [com.google.guava-guava-31.0.1-jre.jar:?] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?] at org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader.lambda$offload$0(BlobStoreManagedLedgerOffloader.java:237) ~[?:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?] at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131) [com.google.guava-guava-31.0.1-jre.jar:?] at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:74) [com.google.guava-guava-31.0.1-jre.jar:?] at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82) [com.google.guava-guava-31.0.1-jre.jar:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?] at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final] at java.lang.Thread.run(Thread.java:829) [?:?] Caused by: org.jclouds.http.HttpResponseException: command: POST https://www.googleapis.com/storage/v1/b/somebucket/o/ff553922-1fa3-4ceb-abcd-60106603b5c8-object-123456/compose HTTP/1.1 failed with response: HTTP/1.1 400 Bad Request; content: [{ "error": { "code": 400, "message": "The number of source components provided (35) exceeds the maximum (32)", "errors": [ { "message": "The number of source components provided (35) exceeds the maximum (32)", "domain": "global", "reason": "invalid" } ] } } ] at org.jclouds.googlecloudstorage.handlers.GoogleCloudStorageErrorHandler.handleError(GoogleCloudStorageErrorHandler.java:40) ~[?:?] at org.jclouds.http.handlers.DelegatingErrorHandler.handleError(DelegatingErrorHandler.java:65) ~[?:?] at org.jclouds.http.internal.BaseHttpCommandExecutorService.shouldContinue(BaseHttpCommandExecutorService.java:138) ~[?:?] at org.jclouds.http.internal.BaseHttpCommandExecutorService.invoke(BaseHttpCommandExecutorS
Re: [PR] [fix][broker] upgrade jclouds 2.5.0 -> 2.6.0 [pulsar]
lhotari commented on PR #0: URL: https://github.com/apache/pulsar/pull/0#issuecomment-2069414827 @nikhil-ctds I created #22554 that changes the default value of `gcsManagedLedgerOffloadMaxBlockSizeInBytes`. It would be helpful if you could test that `gcsManagedLedgerOffloadMaxBlockSizeInBytes=134217728` fixes the issue. thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] add a lint-docker command in makefile [pulsar-client-go]
BewareMyPower merged PR #1207: URL: https://github.com/apache/pulsar-client-go/pull/1207 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar-client-go) branch master updated: [improve] add a lint-docker command in makefile (#1207)
This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 13ecca24 [improve] add a lint-docker command in makefile (#1207) 13ecca24 is described below commit 13ecca24ffb6b65ac7ec36da53ecb67479518647 Author: zhou zhuohan <843520...@qq.com> AuthorDate: Mon Apr 22 21:23:49 2024 +0800 [improve] add a lint-docker command in makefile (#1207) Co-authored-by: ninjazhou --- Makefile | 6 ++ 1 file changed, 6 insertions(+) diff --git a/Makefile b/Makefile index df4d539d..cdae8a59 100644 --- a/Makefile +++ b/Makefile @@ -38,6 +38,12 @@ lint: bin/golangci-lint bin/golangci-lint: GOBIN=$(shell pwd)/bin go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.51.2 +# an alternative to above `make lint` command +# use golangCi-lint docker to avoid local golang env issues +# https://golangci-lint.run/welcome/install/ +lint-docker: + docker run --rm -v $(shell pwd):/app -w /app golangci/golangci-lint:v1.51.2 golangci-lint run -v + container: docker build -t ${IMAGE_NAME} \ --build-arg GO_VERSION="${GO_VERSION}" \
Re: [PR] [Improve] Add admin api GetLeaderBroker [pulsar-client-go]
BewareMyPower merged PR #1203: URL: https://github.com/apache/pulsar-client-go/pull/1203 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar-client-go) branch master updated: [Improve] Add admin api GetLeaderBroker (#1203)
This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new b4d45cd3 [Improve] Add admin api GetLeaderBroker (#1203) b4d45cd3 is described below commit b4d45cd360599b52e3f160db9d39d619889c556a Author: crossoverJie AuthorDate: Mon Apr 22 21:25:00 2024 +0800 [Improve] Add admin api GetLeaderBroker (#1203) ### Motivation To keep consistent with the [Java client](https://github.com/apache/pulsar/pull/9799). ### Modifications Add `GetLeaderBroker` interface. --- pulsaradmin/pkg/admin/brokers.go | 12 pulsaradmin/pkg/admin/brokers_test.go | 16 pulsaradmin/pkg/utils/data.go | 5 + 3 files changed, 33 insertions(+) diff --git a/pulsaradmin/pkg/admin/brokers.go b/pulsaradmin/pkg/admin/brokers.go index e178610c..650fab8e 100644 --- a/pulsaradmin/pkg/admin/brokers.go +++ b/pulsaradmin/pkg/admin/brokers.go @@ -58,6 +58,9 @@ type Brokers interface { // HealthCheckWithTopicVersion run a health check on the broker HealthCheckWithTopicVersion(utils.TopicVersion) error + + // GetLeaderBroker get the information of the leader broker. + GetLeaderBroker() (utils.BrokerInfo, error) } type broker struct { @@ -162,3 +165,12 @@ func (b *broker) HealthCheckWithTopicVersion(topicVersion utils.TopicVersion) er } return nil } +func (b *broker) GetLeaderBroker() (utils.BrokerInfo, error) { + endpoint := b.pulsar.endpoint(b.basePath, "/leaderBroker") + var brokerInfo utils.BrokerInfo + err := b.pulsar.Client.Get(endpoint, &brokerInfo) + if err != nil { + return brokerInfo, err + } + return brokerInfo, nil +} diff --git a/pulsaradmin/pkg/admin/brokers_test.go b/pulsaradmin/pkg/admin/brokers_test.go index d48ce7cb..97679759 100644 --- a/pulsaradmin/pkg/admin/brokers_test.go +++ b/pulsaradmin/pkg/admin/brokers_test.go @@ -42,3 +42,19 @@ func TestBrokerHealthCheckWithTopicVersion(t *testing.T) { err = admin.Brokers().HealthCheckWithTopicVersion(utils.TopicVersionV2) assert.NoError(t, err) } + +func TestGetLeaderBroker(t *testing.T) { + readFile, err := os.ReadFile("../../../integration-tests/tokens/admin-token") + assert.NoError(t, err) + cfg := &config.Config{ + Token: string(readFile), + } + admin, err := New(cfg) + assert.NoError(t, err) + assert.NotNil(t, admin) + leaderBroker, err := admin.Brokers().GetLeaderBroker() + assert.NoError(t, err) + assert.NotNil(t, leaderBroker) + assert.NotEmpty(t, leaderBroker.ServiceURL) + assert.NotEmpty(t, leaderBroker.BrokerID) +} diff --git a/pulsaradmin/pkg/utils/data.go b/pulsaradmin/pkg/utils/data.go index 1e67e3c7..61607912 100644 --- a/pulsaradmin/pkg/utils/data.go +++ b/pulsaradmin/pkg/utils/data.go @@ -477,6 +477,11 @@ type GetStatsOptions struct { ExcludeConsumers bool `json:"exclude_consumers"` } +type BrokerInfo struct { + BrokerID string `json:"brokerId"` + ServiceURL string `json:"serviceUrl"` +} + type TopicVersion string const (
Re: [PR] [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries when recovering a terminated managed ledger [pulsar]
coderzc commented on code in PR #22552: URL: https://github.com/apache/pulsar/pull/22552#discussion_r1574766383 ## managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java: ## @@ -4695,5 +4695,58 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { && cursorReadPosition.getEntryId() == expectReadPosition.getEntryId()); } +@Test +public void testRecoverCursorWithTerminateManagedLedger() throws Exception { +String mlName = "my_test_ledger"; +String cursorName = "c1"; + +ManagedLedgerConfig config = new ManagedLedgerConfig(); +ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(mlName, config); +ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(cursorName); + +// Write some data. +Position p0 = ledger.addEntry("entry-0".getBytes()); +Position p1 = ledger.addEntry("entry-1".getBytes()); + +// Read and ack message. +List entries = c1.readEntries(2); +assertEquals(entries.size(), 2); +assertEquals(entries.get(0).getPosition(), p0); +assertEquals(entries.get(1).getPosition(), p1); +entries.forEach(Entry::release); + +// Mark delete the last message. +c1.markDelete(p1); +Position markDeletedPosition = c1.getMarkDeletedPosition(); +Assert.assertEquals(markDeletedPosition, p1); + +// Terminate the managed ledger. +Position lastPosition = ledger.terminate(); +assertEquals(lastPosition, p1); + +// Close the ledger. +ledger.close(); + +// Reopen the ledger. +ledger = (ManagedLedgerImpl) factory.open(mlName, config); +BookKeeper mockBookKeeper = mock(BookKeeper.class); +final ManagedCursorImpl cursor = new ManagedCursorImpl(mockBookKeeper, new ManagedLedgerConfig(), ledger, +cursorName); + +// Recover the cursor. +cursor.recover(new VoidCallback() { Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] 3.0 test [pulsar]
github-actions[bot] commented on PR #22555: URL: https://github.com/apache/pulsar/pull/22555#issuecomment-2069570265 @coderzc Please add the following content to your PR description and select a checkbox: ``` - [ ] `doc` - [ ] `doc-required` - [ ] `doc-not-needed` - [ ] `doc-complete` ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] 3.0 test [pulsar]
coderzc opened a new pull request, #22555: URL: https://github.com/apache/pulsar/pull/22555 Fixes #xyz Main Issue: #xyz PIP: #xyz ### Motivation ### Modifications ### Verifying this change - [ ] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation - [ ] `doc` - [ ] `doc-required` - [ ] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] 3.0 test [pulsar]
coderzc closed pull request #22555: 3.0 test URL: https://github.com/apache/pulsar/pull/22555 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [D] broker healthcheck endpoint reports No such ledger exists on Metadata Server - ledger endlessly [pulsar]
GitHub user wallacepeng edited a discussion: broker healthcheck endpoint reports No such ledger exists on Metadata Server - ledger endlessly 2024-04-21T03:14:20,474+ [pulsar-io-4-7] ERROR org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/pulsar/pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local:8080/healthcheck] [null] Failed to create producer: {"errorMsg":"org.apache.pulsar.broker.service.schema.exceptions.SchemaException: No such ledger exists on Metadata Server - ledger=3596046 - operation=Failed to open ledger caused by org.apache.pulsar.broker.service.schema.exceptions.SchemaException: No such ledger exists on Metadata Server - ledger=3596046 - operation=Failed to open ledger","reqId":3493733038286036253, "remote":"pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local/172.20.203.179:6650", "local":"/172.20.203.179:58124"} 2024-04-21 03:14:20.474 2024-04-21T03:14:20,474+ [pulsar-io-4-7] WARN org.apache.pulsar.client.impl.ClientCnx - [id: 0xdce5010a, L:/172.20.203.179:58124 - R:pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local/172.20.203.179:6650] Received error from server: org.apache.pulsar.broker.service.schema.exceptions.SchemaException: No such ledger exists on Metadata Server - ledger=3596046 - operation=Failed to open ledger caused by org.apache.pulsar.broker.service.schema.exceptions.SchemaException: No such ledger exists on Metadata Server - ledger=3596046 - operation=Failed to open ledger this is caused by that internal healthcheck cannot create producer for topic (it seems have null producerName) persistent://pulsar/pulsar/pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local:8080/healthcheck . it reports No such ledger exists on Metadata Server . All the others work except the healthcheck . i have bookkeeper clusters, i decommission the default one then it got this error . does it still try to create ledger on decommisisoned bookie ? ``` bin/pulsar-admin topics stats-internal persistent://pulsar/pulsar/pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local:8080/healthcheck No such ledger exists on Metadata Server Reason: No such ledger exists on Metadata Server https://github.com/apache/pulsar/assets/894641/9eb33a0f-48f4-4ff4-8f80-88ea5c7cc1d7";> correct log from other pulsar cluster looks like https://github.com/apache/pulsar/assets/894641/90c104d9-b351-4212-b26b-41dc170a18e4";> GitHub link: https://github.com/apache/pulsar/discussions/22545 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [PR] [improve] PIP-307: Use assigned broker URL hints during broker reconnection [pulsar-client-go]
BewareMyPower commented on code in PR #1208: URL: https://github.com/apache/pulsar-client-go/pull/1208#discussion_r1574823910 ## pulsar/internal/lookup_service.go: ## @@ -30,8 +30,9 @@ import ( // LookupResult encapsulates a struct for lookup a request, containing two parts: LogicalAddr, PhysicalAddr. type LookupResult struct { - LogicalAddr *url.URL - PhysicalAddr *url.URL + IsProxyThroughServiceURL bool Review Comment: It seems `IsProxyThroughServiceURL` is never used? ## pulsar/producer_partition.go: ## @@ -373,10 +390,14 @@ func (p *partitionProducer) GetBuffer() internal.Buffer { return b } -func (p *partitionProducer) ConnectionClosed() { +func (p *partitionProducer) ConnectionClosed(closeProducer *pb.CommandCloseProducer) { Review Comment: I see `ConnectionClosed(nil)` could be called in `connection.go`, should you perform the null check here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[I] PKIX path building failed: unable to find valid certification path to request to requested target [pulsar-helm-chart]
bharatbhushan1705 opened a new issue, #492: URL: https://github.com/apache/pulsar-helm-chart/issues/492 **Describe the bug** A clear and concise description of what the bug is. The error message javax.net.ssl.SSLHandshakeException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target **To Reproduce** Steps to reproduce the behavior: 1. Use of self-signed certificate keystone 2. The TLS is enabled for proxy and broker 3. The Apache Pulsar is running in Openshift Cluster 4. The Apache Pulsar version 3.2.0 **Expected behavior** When we are communicating directly to broker url it is working, we expect same when we try to connect via proxy. **Screenshots** If applicable, add screenshots to help explain your problem. **Desktop (please complete the following information):** - OS: Ubuntu **Additional context** Add any other context about the problem here. Error logs: 2172 at sun.security.validator.Validator.validate(Validator.java:264) ~[?:?] 2173 at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:285) ~[?:?] 2174 at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:144) ~[?:?] 2175 at sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1329) ~[?:?] 2176 ... 17 more 2177 Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target 2178 at sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:148) ~[?:?] 2179 at sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:129) ~[?:?] 2180 at java.security.cert.CertPathBuilder.build(CertPathBuilder.java:297) ~[?:?] 2181 at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:434) ~[?:?] 2182 at sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:306) ~[?:?] 2183 at sun.security.validator.Validator.validate(Validator.java:264) ~[?:?] 2184 at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:285) ~[?:?] 2185 at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:144) ~[?:?] 2186 at sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1329) ~[?:?] 2187 ... 17 more -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] upgrade jclouds 2.5.0 -> 2.6.0 [pulsar]
nikhil-ctds commented on PR #0: URL: https://github.com/apache/pulsar/pull/0#issuecomment-2069791969 @lhotari I have tested with `gcsManagedLedgerOffloadMaxBlockSizeInBytes=134217728`, and was able to offload ledger of 3.5gb to gcs bucket. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][offload] Increase file upload limit from 2048MiB to 4096MiB for GCP/GCS offloading [pulsar]
lhotari merged PR #22554: URL: https://github.com/apache/pulsar/pull/22554 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Offloading large ledgers (>2GB) fail with Google Cloud Storage [pulsar]
lhotari closed issue #15159: Offloading large ledgers (>2GB) fail with Google Cloud Storage URL: https://github.com/apache/pulsar/issues/15159 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [fix][offload] Increase file upload limit from 2048MiB to 4096MiB for GCP/GCS offloading (#22554)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new e81f37000ec [fix][offload] Increase file upload limit from 2048MiB to 4096MiB for GCP/GCS offloading (#22554) e81f37000ec is described below commit e81f37000ec212676c5daffa17faad8fc604ff77 Author: Lari Hotari AuthorDate: Mon Apr 22 18:13:45 2024 +0300 [fix][offload] Increase file upload limit from 2048MiB to 4096MiB for GCP/GCS offloading (#22554) --- conf/broker.conf | 11 ++- .../pulsar/common/policies/data/OffloadPoliciesImpl.java | 7 --- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index d482f77da7c..d97e3a5ef89 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1670,10 +1670,10 @@ s3ManagedLedgerOffloadBucket= # For Amazon S3 ledger offload, Alternative endpoint to connect to (useful for testing) s3ManagedLedgerOffloadServiceEndpoint= -# For Amazon S3 ledger offload, Max block size in bytes. (64MB by default, 5MB minimum) +# For Amazon S3 ledger offload, Max block size in bytes. (64MiB by default, 5MiB minimum) s3ManagedLedgerOffloadMaxBlockSizeInBytes=67108864 -# For Amazon S3 ledger offload, Read buffer size in bytes (1MB by default) +# For Amazon S3 ledger offload, Read buffer size in bytes (1MiB by default) s3ManagedLedgerOffloadReadBufferSizeInBytes=1048576 # For Google Cloud Storage ledger offload, region where offload bucket is located. @@ -1683,10 +1683,11 @@ gcsManagedLedgerOffloadRegion= # For Google Cloud Storage ledger offload, Bucket to place offloaded ledger into gcsManagedLedgerOffloadBucket= -# For Google Cloud Storage ledger offload, Max block size in bytes. (64MB by default, 5MB minimum) -gcsManagedLedgerOffloadMaxBlockSizeInBytes=67108864 +# For Google Cloud Storage ledger offload, Max block size in bytes. (128MiB by default, 5MiB minimum) +# Since JClouds limits the maximum number of blocks to 32, the maximum size of a ledger is 32 times the block size. +gcsManagedLedgerOffloadMaxBlockSizeInBytes=134217728 -# For Google Cloud Storage ledger offload, Read buffer size in bytes (1MB by default) +# For Google Cloud Storage ledger offload, Read buffer size in bytes (1MiB by default) gcsManagedLedgerOffloadReadBufferSizeInBytes=1048576 # For Google Cloud Storage, path to json file containing service account credentials. diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java index 51e181811c2..6c40aa3f2ed 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java @@ -79,8 +79,9 @@ public class OffloadPoliciesImpl implements Serializable, OffloadPolicies { } } -public static final int DEFAULT_MAX_BLOCK_SIZE_IN_BYTES = 64 * 1024 * 1024; // 64MB -public static final int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024; // 1MB +public static final int DEFAULT_MAX_BLOCK_SIZE_IN_BYTES = 64 * 1024 * 1024; // 64MiB +public static final int DEFAULT_GCS_MAX_BLOCK_SIZE_IN_BYTES = 128 * 1024 * 1024; // 128MiB +public static final int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024; // 1MiB public static final int DEFAULT_OFFLOAD_MAX_THREADS = 2; public static final int DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS = 1; public static final String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders"; @@ -163,7 +164,7 @@ public class OffloadPoliciesImpl implements Serializable, OffloadPolicies { private String gcsManagedLedgerOffloadBucket = null; @Configuration @JsonProperty(access = JsonProperty.Access.READ_WRITE) -private Integer gcsManagedLedgerOffloadMaxBlockSizeInBytes = DEFAULT_MAX_BLOCK_SIZE_IN_BYTES; +private Integer gcsManagedLedgerOffloadMaxBlockSizeInBytes = DEFAULT_GCS_MAX_BLOCK_SIZE_IN_BYTES; @Configuration @JsonProperty(access = JsonProperty.Access.READ_WRITE) private Integer gcsManagedLedgerOffloadReadBufferSizeInBytes = DEFAULT_READ_BUFFER_SIZE_IN_BYTES;
Re: [PR] [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries when recovering a terminated managed ledger [pulsar]
lhotari merged PR #22552: URL: https://github.com/apache/pulsar/pull/22552 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated (e81f37000ec -> 35599b73253)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from e81f37000ec [fix][offload] Increase file upload limit from 2048MiB to 4096MiB for GCP/GCS offloading (#22554) add 35599b73253 [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries when recovering a terminated managed ledger (#22552) No new revisions were added by this update. Summary of changes: .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 2 +- .../bookkeeper/mledger/impl/ManagedCursorTest.java | 61 ++ 2 files changed, 62 insertions(+), 1 deletion(-)
Re: [PR] chore(deps): bump golang.org/x/net from 0.17.0 to 0.23.0 [pulsar-client-go]
merlimat merged PR #1209: URL: https://github.com/apache/pulsar-client-go/pull/1209 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar-client-go) branch master updated (b4d45cd3 -> 458defe3)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git from b4d45cd3 [Improve] Add admin api GetLeaderBroker (#1203) add 458defe3 chore(deps): bump golang.org/x/net from 0.17.0 to 0.23.0 (#1209) No new revisions were added by this update. Summary of changes: go.mod | 8 go.sum | 16 2 files changed, 12 insertions(+), 12 deletions(-)
(pulsar-client-go) branch dependabot/go_modules/golang.org/x/net-0.23.0 deleted (was 43818365)
This is an automated email from the ASF dual-hosted git repository. github-bot pushed a change to branch dependabot/go_modules/golang.org/x/net-0.23.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git was 43818365 chore(deps): bump golang.org/x/net from 0.17.0 to 0.23.0 The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository.
Re: [PR] [fix][broker] Create new ledger after the current ledger is closed [pulsar]
lhotari commented on PR #22034: URL: https://github.com/apache/pulsar/pull/22034#issuecomment-2070058523 When cherry-picking, it's important to also pick #22552 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch branch-3.1 updated: [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries when recovering a terminated managed ledger (#22552)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new 6be74f1adaf [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries when recovering a terminated managed ledger (#22552) 6be74f1adaf is described below commit 6be74f1adaf1b6d75a5072ea764cf376fdb02694 Author: Cong Zhao AuthorDate: Tue Apr 23 00:05:41 2024 +0800 [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries when recovering a terminated managed ledger (#22552) (cherry picked from commit 35599b7325347838203a92ca63b78d134b7864c2) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 2 +- .../bookkeeper/mledger/impl/ManagedCursorTest.java | 61 ++ 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 7a9605830de..d36b85aa10a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3646,7 +3646,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { Long nextLedgerId = ledgers.ceilingKey(skippedPosition.getLedgerId() + 1); // This means it has jumped to the last position if (nextLedgerId == null) { -if (currentLedgerEntries == 0) { +if (currentLedgerEntries == 0 && currentLedger != null) { return PositionImpl.get(currentLedger.getId(), 0); } return lastConfirmedEntry.getNext(); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index c9bd64171c1..4e3f8b79084 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -4695,5 +4695,66 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { && cursorReadPosition.getEntryId() == expectReadPosition.getEntryId()); } +@Test +public void testRecoverCursorWithTerminateManagedLedger() throws Exception { +String mlName = "my_test_ledger"; +String cursorName = "c1"; + +ManagedLedgerConfig config = new ManagedLedgerConfig(); +ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(mlName, config); +ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(cursorName); + +// Write some data. +Position p0 = ledger.addEntry("entry-0".getBytes()); +Position p1 = ledger.addEntry("entry-1".getBytes()); + +// Read message. +List entries = c1.readEntries(2); +assertEquals(entries.size(), 2); +assertEquals(entries.get(0).getPosition(), p0); +assertEquals(entries.get(1).getPosition(), p1); +entries.forEach(Entry::release); + +// Mark delete the last message. +c1.markDelete(p1); +Position markDeletedPosition = c1.getMarkDeletedPosition(); +Assert.assertEquals(markDeletedPosition, p1); + +// Terminate the managed ledger. +Position lastPosition = ledger.terminate(); +assertEquals(lastPosition, p1); + +// Close the ledger. +ledger.close(); + +// Reopen the ledger. +ledger = (ManagedLedgerImpl) factory.open(mlName, config); +BookKeeper mockBookKeeper = mock(BookKeeper.class); +final ManagedCursorImpl cursor = new ManagedCursorImpl(mockBookKeeper, new ManagedLedgerConfig(), ledger, +cursorName); + +CompletableFuture recoverFuture = new CompletableFuture<>(); +// Recover the cursor. +cursor.recover(new VoidCallback() { +@Override +public void operationComplete() { +recoverFuture.complete(null); +} + +@Override +public void operationFailed(ManagedLedgerException exception) { +recoverFuture.completeExceptionally(exception); +} +}); + +recoverFuture.join(); +assertTrue(recoverFuture.isDone()); +assertFalse(recoverFuture.isCompletedExceptionally()); + +// Verify the cursor state. +assertEquals(cursor.getMarkDeletedPosition(), markDeletedPosition); +assertEquals(cursor.getReadPosition(), markDeletedPosition.getNext()); +} + private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class); }
(pulsar) branch branch-3.0 updated: [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries when recovering a terminated managed ledger (#22552)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new def695b18b2 [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries when recovering a terminated managed ledger (#22552) def695b18b2 is described below commit def695b18b270faf4e159b060ee75dbc4e699744 Author: Cong Zhao AuthorDate: Tue Apr 23 00:05:41 2024 +0800 [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries when recovering a terminated managed ledger (#22552) (cherry picked from commit 35599b7325347838203a92ca63b78d134b7864c2) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 2 +- .../bookkeeper/mledger/impl/ManagedCursorTest.java | 61 ++ 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index d9891721383..8415fdcede1 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3701,7 +3701,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { Long nextLedgerId = ledgers.ceilingKey(skippedPosition.getLedgerId() + 1); // This means it has jumped to the last position if (nextLedgerId == null) { -if (currentLedgerEntries == 0) { +if (currentLedgerEntries == 0 && currentLedger != null) { return PositionImpl.get(currentLedger.getId(), 0); } return lastConfirmedEntry.getNext(); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index c9bd64171c1..4e3f8b79084 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -4695,5 +4695,66 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { && cursorReadPosition.getEntryId() == expectReadPosition.getEntryId()); } +@Test +public void testRecoverCursorWithTerminateManagedLedger() throws Exception { +String mlName = "my_test_ledger"; +String cursorName = "c1"; + +ManagedLedgerConfig config = new ManagedLedgerConfig(); +ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(mlName, config); +ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(cursorName); + +// Write some data. +Position p0 = ledger.addEntry("entry-0".getBytes()); +Position p1 = ledger.addEntry("entry-1".getBytes()); + +// Read message. +List entries = c1.readEntries(2); +assertEquals(entries.size(), 2); +assertEquals(entries.get(0).getPosition(), p0); +assertEquals(entries.get(1).getPosition(), p1); +entries.forEach(Entry::release); + +// Mark delete the last message. +c1.markDelete(p1); +Position markDeletedPosition = c1.getMarkDeletedPosition(); +Assert.assertEquals(markDeletedPosition, p1); + +// Terminate the managed ledger. +Position lastPosition = ledger.terminate(); +assertEquals(lastPosition, p1); + +// Close the ledger. +ledger.close(); + +// Reopen the ledger. +ledger = (ManagedLedgerImpl) factory.open(mlName, config); +BookKeeper mockBookKeeper = mock(BookKeeper.class); +final ManagedCursorImpl cursor = new ManagedCursorImpl(mockBookKeeper, new ManagedLedgerConfig(), ledger, +cursorName); + +CompletableFuture recoverFuture = new CompletableFuture<>(); +// Recover the cursor. +cursor.recover(new VoidCallback() { +@Override +public void operationComplete() { +recoverFuture.complete(null); +} + +@Override +public void operationFailed(ManagedLedgerException exception) { +recoverFuture.completeExceptionally(exception); +} +}); + +recoverFuture.join(); +assertTrue(recoverFuture.isDone()); +assertFalse(recoverFuture.isCompletedExceptionally()); + +// Verify the cursor state. +assertEquals(cursor.getMarkDeletedPosition(), markDeletedPosition); +assertEquals(cursor.getReadPosition(), markDeletedPosition.getNext()); +} + private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class); }
(pulsar) branch branch-3.2 updated: [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries when recovering a terminated managed ledger (#22552)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new a3122b106fc [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries when recovering a terminated managed ledger (#22552) a3122b106fc is described below commit a3122b106fc227be218af9b611626fcbfb01a5e4 Author: Cong Zhao AuthorDate: Tue Apr 23 00:05:41 2024 +0800 [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries when recovering a terminated managed ledger (#22552) (cherry picked from commit 35599b7325347838203a92ca63b78d134b7864c2) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 2 +- .../bookkeeper/mledger/impl/ManagedCursorTest.java | 61 ++ 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 1274b347263..bd74629e605 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3700,7 +3700,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { Long nextLedgerId = ledgers.ceilingKey(skippedPosition.getLedgerId() + 1); // This means it has jumped to the last position if (nextLedgerId == null) { -if (currentLedgerEntries == 0) { +if (currentLedgerEntries == 0 && currentLedger != null) { return PositionImpl.get(currentLedger.getId(), 0); } return lastConfirmedEntry.getNext(); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index c9bd64171c1..4e3f8b79084 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -4695,5 +4695,66 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { && cursorReadPosition.getEntryId() == expectReadPosition.getEntryId()); } +@Test +public void testRecoverCursorWithTerminateManagedLedger() throws Exception { +String mlName = "my_test_ledger"; +String cursorName = "c1"; + +ManagedLedgerConfig config = new ManagedLedgerConfig(); +ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(mlName, config); +ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(cursorName); + +// Write some data. +Position p0 = ledger.addEntry("entry-0".getBytes()); +Position p1 = ledger.addEntry("entry-1".getBytes()); + +// Read message. +List entries = c1.readEntries(2); +assertEquals(entries.size(), 2); +assertEquals(entries.get(0).getPosition(), p0); +assertEquals(entries.get(1).getPosition(), p1); +entries.forEach(Entry::release); + +// Mark delete the last message. +c1.markDelete(p1); +Position markDeletedPosition = c1.getMarkDeletedPosition(); +Assert.assertEquals(markDeletedPosition, p1); + +// Terminate the managed ledger. +Position lastPosition = ledger.terminate(); +assertEquals(lastPosition, p1); + +// Close the ledger. +ledger.close(); + +// Reopen the ledger. +ledger = (ManagedLedgerImpl) factory.open(mlName, config); +BookKeeper mockBookKeeper = mock(BookKeeper.class); +final ManagedCursorImpl cursor = new ManagedCursorImpl(mockBookKeeper, new ManagedLedgerConfig(), ledger, +cursorName); + +CompletableFuture recoverFuture = new CompletableFuture<>(); +// Recover the cursor. +cursor.recover(new VoidCallback() { +@Override +public void operationComplete() { +recoverFuture.complete(null); +} + +@Override +public void operationFailed(ManagedLedgerException exception) { +recoverFuture.completeExceptionally(exception); +} +}); + +recoverFuture.join(); +assertTrue(recoverFuture.isDone()); +assertFalse(recoverFuture.isCompletedExceptionally()); + +// Verify the cursor state. +assertEquals(cursor.getMarkDeletedPosition(), markDeletedPosition); +assertEquals(cursor.getReadPosition(), markDeletedPosition.getNext()); +} + private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class); }
(pulsar) 02/02: [fix][offload] Increase file upload limit from 2048MiB to 4096MiB for GCP/GCS offloading (#22554)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 5c09c20b33c8e9563df82b4f510fe1d7b519f33c Author: Lari Hotari AuthorDate: Mon Apr 22 18:13:45 2024 +0300 [fix][offload] Increase file upload limit from 2048MiB to 4096MiB for GCP/GCS offloading (#22554) (cherry picked from commit e81f37000ec212676c5daffa17faad8fc604ff77) --- conf/broker.conf | 11 ++- .../pulsar/common/policies/data/OffloadPoliciesImpl.java | 7 --- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index ec0974dca20..34f7ab017e9 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1657,10 +1657,10 @@ s3ManagedLedgerOffloadBucket= # For Amazon S3 ledger offload, Alternative endpoint to connect to (useful for testing) s3ManagedLedgerOffloadServiceEndpoint= -# For Amazon S3 ledger offload, Max block size in bytes. (64MB by default, 5MB minimum) +# For Amazon S3 ledger offload, Max block size in bytes. (64MiB by default, 5MiB minimum) s3ManagedLedgerOffloadMaxBlockSizeInBytes=67108864 -# For Amazon S3 ledger offload, Read buffer size in bytes (1MB by default) +# For Amazon S3 ledger offload, Read buffer size in bytes (1MiB by default) s3ManagedLedgerOffloadReadBufferSizeInBytes=1048576 # For Google Cloud Storage ledger offload, region where offload bucket is located. @@ -1670,10 +1670,11 @@ gcsManagedLedgerOffloadRegion= # For Google Cloud Storage ledger offload, Bucket to place offloaded ledger into gcsManagedLedgerOffloadBucket= -# For Google Cloud Storage ledger offload, Max block size in bytes. (64MB by default, 5MB minimum) -gcsManagedLedgerOffloadMaxBlockSizeInBytes=67108864 +# For Google Cloud Storage ledger offload, Max block size in bytes. (128MiB by default, 5MiB minimum) +# Since JClouds limits the maximum number of blocks to 32, the maximum size of a ledger is 32 times the block size. +gcsManagedLedgerOffloadMaxBlockSizeInBytes=134217728 -# For Google Cloud Storage ledger offload, Read buffer size in bytes (1MB by default) +# For Google Cloud Storage ledger offload, Read buffer size in bytes (1MiB by default) gcsManagedLedgerOffloadReadBufferSizeInBytes=1048576 # For Google Cloud Storage, path to json file containing service account credentials. diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java index 51e181811c2..6c40aa3f2ed 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java @@ -79,8 +79,9 @@ public class OffloadPoliciesImpl implements Serializable, OffloadPolicies { } } -public static final int DEFAULT_MAX_BLOCK_SIZE_IN_BYTES = 64 * 1024 * 1024; // 64MB -public static final int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024; // 1MB +public static final int DEFAULT_MAX_BLOCK_SIZE_IN_BYTES = 64 * 1024 * 1024; // 64MiB +public static final int DEFAULT_GCS_MAX_BLOCK_SIZE_IN_BYTES = 128 * 1024 * 1024; // 128MiB +public static final int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024; // 1MiB public static final int DEFAULT_OFFLOAD_MAX_THREADS = 2; public static final int DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS = 1; public static final String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders"; @@ -163,7 +164,7 @@ public class OffloadPoliciesImpl implements Serializable, OffloadPolicies { private String gcsManagedLedgerOffloadBucket = null; @Configuration @JsonProperty(access = JsonProperty.Access.READ_WRITE) -private Integer gcsManagedLedgerOffloadMaxBlockSizeInBytes = DEFAULT_MAX_BLOCK_SIZE_IN_BYTES; +private Integer gcsManagedLedgerOffloadMaxBlockSizeInBytes = DEFAULT_GCS_MAX_BLOCK_SIZE_IN_BYTES; @Configuration @JsonProperty(access = JsonProperty.Access.READ_WRITE) private Integer gcsManagedLedgerOffloadReadBufferSizeInBytes = DEFAULT_READ_BUFFER_SIZE_IN_BYTES;
(pulsar) 01/02: [fix][broker] upgrade jclouds 2.5.0 -> 2.6.0 (#22220)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit fc136e8fe11e3a16477ca92784debe592c1d8568 Author: Paul Gier AuthorDate: Fri Mar 15 09:46:33 2024 -0500 [fix][broker] upgrade jclouds 2.5.0 -> 2.6.0 (#0) Co-authored-by: 道君 Co-authored-by: Lari Hotari (cherry picked from commit 73dc213d4cec3513a1addbcb3518f441093c57ec) --- jclouds-shaded/pom.xml | 78 ++ pom.xml| 2 +- 2 files changed, 60 insertions(+), 20 deletions(-) diff --git a/jclouds-shaded/pom.xml b/jclouds-shaded/pom.xml index 2bf295e6261..58d3da73a1e 100644 --- a/jclouds-shaded/pom.xml +++ b/jclouds-shaded/pom.xml @@ -33,8 +33,17 @@ jclouds-shaded Apache Pulsar :: Jclouds shaded + + +2.10.1 +32.0.0-jre +7.0.0 +2.0.1 +3.0.0 +2.0.0 + - org.apache.jclouds jclouds-allblobstore @@ -61,12 +70,48 @@ jclouds-slf4j ${jclouds.version} - - javax.annotation - javax.annotation-api - + + + +com.google.code.gson +gson +${gson.version} + + +com.google.guava +guava +${guava.version} + + +com.google.inject +guice +${guice.version} + + +com.google.inject.extensions +guice-assistedinject +${guice.version} + + +jakarta.inject +jakarta.inject-api +${jakarta.inject.api.version} + + +jakarta.ws.rs +jakarta.ws.rs-api +${jakarta.ws.rs-api.version} + + +jakarta.annotation +jakarta.annotation-api +${jakarta.annotation-api.version} + + + + @@ -97,13 +142,13 @@ com.google.inject.extensions:guice-multibindings com.google.code.gson:gson org.apache.httpcomponents:* - javax.ws.rs:* com.jamesmurty.utils:* net.iharder:* aopalliance:* - javax.inject:* - javax.annotation:* com.google.errorprone:* + jakarta.inject:jakarta.inject-api + jakarta.annotation:jakarta.annotation-api + jakarta.ws.rs:jakarta.ws.rs-api @@ -112,10 +157,6 @@ com.google org.apache.pulsar.jcloud.shade.com.google - - javax.ws - org.apache.pulsar.jcloud.shade.javax.ws - com.jamesmurty.utils org.apache.pulsar.jcloud.shade.com.jamesmurty.utils @@ -129,18 +170,17 @@ org.apache.pulsar.jcloud.shade.net.iharder - javax.inject - org.apache.pulsar.jcloud.shade.javax.inject + com.google.errorprone + org.apache.pulsar.jcloud.shade.com.google.errorprone - javax.annotation - org.apache.pulsar.jcloud.shade.javax.annotation + jakarta + org.apache.pulsar.jcloud.shade.jakarta - com.google.errorprone - org.apache.pulsar.jcloud.shade.com.google.errorprone + org.aopalliance + org.apache.pulsar.jcloud.shade.org.aopalliance - diff --git a/pom.xml b/pom.xml index ab609f9feee..fbae234e855 100644 --- a/pom.xml +++ b/pom.xml @@ -184,7 +184,7 @@ flexible messaging model and an intuitive client API. 1.12.262 1.11.3 2.10.10 -2.5.0 +2.6.0 5.1.0 3.42.0.0 8.0.11
(pulsar) branch branch-3.0 updated (def695b18b2 -> 5c09c20b33c)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a change to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git from def695b18b2 [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries when recovering a terminated managed ledger (#22552) new fc136e8fe11 [fix][broker] upgrade jclouds 2.5.0 -> 2.6.0 (#0) new 5c09c20b33c [fix][offload] Increase file upload limit from 2048MiB to 4096MiB for GCP/GCS offloading (#22554) The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: conf/broker.conf | 11 +-- jclouds-shaded/pom.xml | 78 -- pom.xml| 2 +- .../common/policies/data/OffloadPoliciesImpl.java | 7 +- 4 files changed, 70 insertions(+), 28 deletions(-)
(pulsar) branch branch-3.1 updated (6be74f1adaf -> 1c8fd73b9c2)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a change to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git from 6be74f1adaf [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries when recovering a terminated managed ledger (#22552) new 48ff241a45d [fix][broker] upgrade jclouds 2.5.0 -> 2.6.0 (#0) new 1c8fd73b9c2 [fix][offload] Increase file upload limit from 2048MiB to 4096MiB for GCP/GCS offloading (#22554) The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: conf/broker.conf | 11 +-- jclouds-shaded/pom.xml | 78 -- pom.xml| 2 +- .../common/policies/data/OffloadPoliciesImpl.java | 7 +- 4 files changed, 70 insertions(+), 28 deletions(-)
(pulsar) 01/02: [fix][broker] upgrade jclouds 2.5.0 -> 2.6.0 (#22220)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 48ff241a45d795d9dbd64aa9377c42f67219ecaa Author: Paul Gier AuthorDate: Fri Mar 15 09:46:33 2024 -0500 [fix][broker] upgrade jclouds 2.5.0 -> 2.6.0 (#0) Co-authored-by: 道君 Co-authored-by: Lari Hotari (cherry picked from commit 73dc213d4cec3513a1addbcb3518f441093c57ec) --- jclouds-shaded/pom.xml | 78 ++ pom.xml| 2 +- 2 files changed, 60 insertions(+), 20 deletions(-) diff --git a/jclouds-shaded/pom.xml b/jclouds-shaded/pom.xml index 8ee11030b6f..d39071bb6a5 100644 --- a/jclouds-shaded/pom.xml +++ b/jclouds-shaded/pom.xml @@ -33,8 +33,17 @@ jclouds-shaded Apache Pulsar :: Jclouds shaded + + +2.10.1 +32.0.0-jre +7.0.0 +2.0.1 +3.0.0 +2.0.0 + - org.apache.jclouds jclouds-allblobstore @@ -61,12 +70,48 @@ jclouds-slf4j ${jclouds.version} - - javax.annotation - javax.annotation-api - + + + +com.google.code.gson +gson +${gson.version} + + +com.google.guava +guava +${guava.version} + + +com.google.inject +guice +${guice.version} + + +com.google.inject.extensions +guice-assistedinject +${guice.version} + + +jakarta.inject +jakarta.inject-api +${jakarta.inject.api.version} + + +jakarta.ws.rs +jakarta.ws.rs-api +${jakarta.ws.rs-api.version} + + +jakarta.annotation +jakarta.annotation-api +${jakarta.annotation-api.version} + + + + @@ -97,13 +142,13 @@ com.google.inject.extensions:guice-multibindings com.google.code.gson:gson org.apache.httpcomponents:* - javax.ws.rs:* com.jamesmurty.utils:* net.iharder:* aopalliance:* - javax.inject:* - javax.annotation:* com.google.errorprone:* + jakarta.inject:jakarta.inject-api + jakarta.annotation:jakarta.annotation-api + jakarta.ws.rs:jakarta.ws.rs-api @@ -112,10 +157,6 @@ com.google org.apache.pulsar.jcloud.shade.com.google - - javax.ws - org.apache.pulsar.jcloud.shade.javax.ws - com.jamesmurty.utils org.apache.pulsar.jcloud.shade.com.jamesmurty.utils @@ -129,18 +170,17 @@ org.apache.pulsar.jcloud.shade.net.iharder - javax.inject - org.apache.pulsar.jcloud.shade.javax.inject + com.google.errorprone + org.apache.pulsar.jcloud.shade.com.google.errorprone - javax.annotation - org.apache.pulsar.jcloud.shade.javax.annotation + jakarta + org.apache.pulsar.jcloud.shade.jakarta - com.google.errorprone - org.apache.pulsar.jcloud.shade.com.google.errorprone + org.aopalliance + org.apache.pulsar.jcloud.shade.org.aopalliance - diff --git a/pom.xml b/pom.xml index 8a0ed47fb8d..b4b68b99031 100644 --- a/pom.xml +++ b/pom.xml @@ -184,7 +184,7 @@ flexible messaging model and an intuitive client API. 1.12.262 1.11.3 2.10.10 -2.5.0 +2.6.0 5.1.0 3.42.0.0 8.0.11
(pulsar) 02/02: [fix][offload] Increase file upload limit from 2048MiB to 4096MiB for GCP/GCS offloading (#22554)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 1c8fd73b9c2c72cd91297723d6681dcaff82d8e7 Author: Lari Hotari AuthorDate: Mon Apr 22 18:13:45 2024 +0300 [fix][offload] Increase file upload limit from 2048MiB to 4096MiB for GCP/GCS offloading (#22554) (cherry picked from commit e81f37000ec212676c5daffa17faad8fc604ff77) --- conf/broker.conf | 11 ++- .../pulsar/common/policies/data/OffloadPoliciesImpl.java | 7 --- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 2dbcff12b16..74be6803fe2 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1641,10 +1641,10 @@ s3ManagedLedgerOffloadBucket= # For Amazon S3 ledger offload, Alternative endpoint to connect to (useful for testing) s3ManagedLedgerOffloadServiceEndpoint= -# For Amazon S3 ledger offload, Max block size in bytes. (64MB by default, 5MB minimum) +# For Amazon S3 ledger offload, Max block size in bytes. (64MiB by default, 5MiB minimum) s3ManagedLedgerOffloadMaxBlockSizeInBytes=67108864 -# For Amazon S3 ledger offload, Read buffer size in bytes (1MB by default) +# For Amazon S3 ledger offload, Read buffer size in bytes (1MiB by default) s3ManagedLedgerOffloadReadBufferSizeInBytes=1048576 # For Google Cloud Storage ledger offload, region where offload bucket is located. @@ -1654,10 +1654,11 @@ gcsManagedLedgerOffloadRegion= # For Google Cloud Storage ledger offload, Bucket to place offloaded ledger into gcsManagedLedgerOffloadBucket= -# For Google Cloud Storage ledger offload, Max block size in bytes. (64MB by default, 5MB minimum) -gcsManagedLedgerOffloadMaxBlockSizeInBytes=67108864 +# For Google Cloud Storage ledger offload, Max block size in bytes. (128MiB by default, 5MiB minimum) +# Since JClouds limits the maximum number of blocks to 32, the maximum size of a ledger is 32 times the block size. +gcsManagedLedgerOffloadMaxBlockSizeInBytes=134217728 -# For Google Cloud Storage ledger offload, Read buffer size in bytes (1MB by default) +# For Google Cloud Storage ledger offload, Read buffer size in bytes (1MiB by default) gcsManagedLedgerOffloadReadBufferSizeInBytes=1048576 # For Google Cloud Storage, path to json file containing service account credentials. diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java index 51e181811c2..6c40aa3f2ed 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java @@ -79,8 +79,9 @@ public class OffloadPoliciesImpl implements Serializable, OffloadPolicies { } } -public static final int DEFAULT_MAX_BLOCK_SIZE_IN_BYTES = 64 * 1024 * 1024; // 64MB -public static final int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024; // 1MB +public static final int DEFAULT_MAX_BLOCK_SIZE_IN_BYTES = 64 * 1024 * 1024; // 64MiB +public static final int DEFAULT_GCS_MAX_BLOCK_SIZE_IN_BYTES = 128 * 1024 * 1024; // 128MiB +public static final int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024; // 1MiB public static final int DEFAULT_OFFLOAD_MAX_THREADS = 2; public static final int DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS = 1; public static final String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders"; @@ -163,7 +164,7 @@ public class OffloadPoliciesImpl implements Serializable, OffloadPolicies { private String gcsManagedLedgerOffloadBucket = null; @Configuration @JsonProperty(access = JsonProperty.Access.READ_WRITE) -private Integer gcsManagedLedgerOffloadMaxBlockSizeInBytes = DEFAULT_MAX_BLOCK_SIZE_IN_BYTES; +private Integer gcsManagedLedgerOffloadMaxBlockSizeInBytes = DEFAULT_GCS_MAX_BLOCK_SIZE_IN_BYTES; @Configuration @JsonProperty(access = JsonProperty.Access.READ_WRITE) private Integer gcsManagedLedgerOffloadReadBufferSizeInBytes = DEFAULT_READ_BUFFER_SIZE_IN_BYTES;
(pulsar) 02/02: [fix][offload] Increase file upload limit from 2048MiB to 4096MiB for GCP/GCS offloading (#22554)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 01671b16417f553a6bf51257ed74202656339d83 Author: Lari Hotari AuthorDate: Mon Apr 22 18:13:45 2024 +0300 [fix][offload] Increase file upload limit from 2048MiB to 4096MiB for GCP/GCS offloading (#22554) (cherry picked from commit e81f37000ec212676c5daffa17faad8fc604ff77) --- conf/broker.conf | 11 ++- .../pulsar/common/policies/data/OffloadPoliciesImpl.java | 7 --- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 2d5221c65c4..dd0f3e49e1f 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1665,10 +1665,10 @@ s3ManagedLedgerOffloadBucket= # For Amazon S3 ledger offload, Alternative endpoint to connect to (useful for testing) s3ManagedLedgerOffloadServiceEndpoint= -# For Amazon S3 ledger offload, Max block size in bytes. (64MB by default, 5MB minimum) +# For Amazon S3 ledger offload, Max block size in bytes. (64MiB by default, 5MiB minimum) s3ManagedLedgerOffloadMaxBlockSizeInBytes=67108864 -# For Amazon S3 ledger offload, Read buffer size in bytes (1MB by default) +# For Amazon S3 ledger offload, Read buffer size in bytes (1MiB by default) s3ManagedLedgerOffloadReadBufferSizeInBytes=1048576 # For Google Cloud Storage ledger offload, region where offload bucket is located. @@ -1678,10 +1678,11 @@ gcsManagedLedgerOffloadRegion= # For Google Cloud Storage ledger offload, Bucket to place offloaded ledger into gcsManagedLedgerOffloadBucket= -# For Google Cloud Storage ledger offload, Max block size in bytes. (64MB by default, 5MB minimum) -gcsManagedLedgerOffloadMaxBlockSizeInBytes=67108864 +# For Google Cloud Storage ledger offload, Max block size in bytes. (128MiB by default, 5MiB minimum) +# Since JClouds limits the maximum number of blocks to 32, the maximum size of a ledger is 32 times the block size. +gcsManagedLedgerOffloadMaxBlockSizeInBytes=134217728 -# For Google Cloud Storage ledger offload, Read buffer size in bytes (1MB by default) +# For Google Cloud Storage ledger offload, Read buffer size in bytes (1MiB by default) gcsManagedLedgerOffloadReadBufferSizeInBytes=1048576 # For Google Cloud Storage, path to json file containing service account credentials. diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java index 51e181811c2..6c40aa3f2ed 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java @@ -79,8 +79,9 @@ public class OffloadPoliciesImpl implements Serializable, OffloadPolicies { } } -public static final int DEFAULT_MAX_BLOCK_SIZE_IN_BYTES = 64 * 1024 * 1024; // 64MB -public static final int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024; // 1MB +public static final int DEFAULT_MAX_BLOCK_SIZE_IN_BYTES = 64 * 1024 * 1024; // 64MiB +public static final int DEFAULT_GCS_MAX_BLOCK_SIZE_IN_BYTES = 128 * 1024 * 1024; // 128MiB +public static final int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024; // 1MiB public static final int DEFAULT_OFFLOAD_MAX_THREADS = 2; public static final int DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS = 1; public static final String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders"; @@ -163,7 +164,7 @@ public class OffloadPoliciesImpl implements Serializable, OffloadPolicies { private String gcsManagedLedgerOffloadBucket = null; @Configuration @JsonProperty(access = JsonProperty.Access.READ_WRITE) -private Integer gcsManagedLedgerOffloadMaxBlockSizeInBytes = DEFAULT_MAX_BLOCK_SIZE_IN_BYTES; +private Integer gcsManagedLedgerOffloadMaxBlockSizeInBytes = DEFAULT_GCS_MAX_BLOCK_SIZE_IN_BYTES; @Configuration @JsonProperty(access = JsonProperty.Access.READ_WRITE) private Integer gcsManagedLedgerOffloadReadBufferSizeInBytes = DEFAULT_READ_BUFFER_SIZE_IN_BYTES;
(pulsar) 01/02: [fix][broker] upgrade jclouds 2.5.0 -> 2.6.0 (#22220)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 83a243c4df4f5a8492d0fa3850f14ab3db1051a5 Author: Paul Gier AuthorDate: Fri Mar 15 09:46:33 2024 -0500 [fix][broker] upgrade jclouds 2.5.0 -> 2.6.0 (#0) Co-authored-by: 道君 Co-authored-by: Lari Hotari (cherry picked from commit 73dc213d4cec3513a1addbcb3518f441093c57ec) --- jclouds-shaded/pom.xml | 78 ++ pom.xml| 2 +- 2 files changed, 60 insertions(+), 20 deletions(-) diff --git a/jclouds-shaded/pom.xml b/jclouds-shaded/pom.xml index 8c71222092b..92a3c4fee81 100644 --- a/jclouds-shaded/pom.xml +++ b/jclouds-shaded/pom.xml @@ -33,8 +33,17 @@ jclouds-shaded Apache Pulsar :: Jclouds shaded + + +2.10.1 +32.0.0-jre +7.0.0 +2.0.1 +3.0.0 +2.0.0 + - org.apache.jclouds jclouds-allblobstore @@ -61,12 +70,48 @@ jclouds-slf4j ${jclouds.version} - - javax.annotation - javax.annotation-api - + + + +com.google.code.gson +gson +${gson.version} + + +com.google.guava +guava +${guava.version} + + +com.google.inject +guice +${guice.version} + + +com.google.inject.extensions +guice-assistedinject +${guice.version} + + +jakarta.inject +jakarta.inject-api +${jakarta.inject.api.version} + + +jakarta.ws.rs +jakarta.ws.rs-api +${jakarta.ws.rs-api.version} + + +jakarta.annotation +jakarta.annotation-api +${jakarta.annotation-api.version} + + + + @@ -97,13 +142,13 @@ com.google.inject.extensions:guice-multibindings com.google.code.gson:gson org.apache.httpcomponents:* - javax.ws.rs:* com.jamesmurty.utils:* net.iharder:* aopalliance:* - javax.inject:* - javax.annotation:* com.google.errorprone:* + jakarta.inject:jakarta.inject-api + jakarta.annotation:jakarta.annotation-api + jakarta.ws.rs:jakarta.ws.rs-api @@ -112,10 +157,6 @@ com.google org.apache.pulsar.jcloud.shade.com.google - - javax.ws - org.apache.pulsar.jcloud.shade.javax.ws - com.jamesmurty.utils org.apache.pulsar.jcloud.shade.com.jamesmurty.utils @@ -129,18 +170,17 @@ org.apache.pulsar.jcloud.shade.net.iharder - javax.inject - org.apache.pulsar.jcloud.shade.javax.inject + com.google.errorprone + org.apache.pulsar.jcloud.shade.com.google.errorprone - javax.annotation - org.apache.pulsar.jcloud.shade.javax.annotation + jakarta + org.apache.pulsar.jcloud.shade.jakarta - com.google.errorprone - org.apache.pulsar.jcloud.shade.com.google.errorprone + org.aopalliance + org.apache.pulsar.jcloud.shade.org.aopalliance - diff --git a/pom.xml b/pom.xml index bd41ebbbed9..8aa8bf36c98 100644 --- a/pom.xml +++ b/pom.xml @@ -185,7 +185,7 @@ flexible messaging model and an intuitive client API. 1.12.262 1.11.3 2.10.10 -2.5.0 +2.6.0 5.1.0 3.42.0.0 8.0.11
(pulsar) branch branch-3.2 updated (a3122b106fc -> 01671b16417)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a change to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git from a3122b106fc [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries when recovering a terminated managed ledger (#22552) new 83a243c4df4 [fix][broker] upgrade jclouds 2.5.0 -> 2.6.0 (#0) new 01671b16417 [fix][offload] Increase file upload limit from 2048MiB to 4096MiB for GCP/GCS offloading (#22554) The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: conf/broker.conf | 11 +-- jclouds-shaded/pom.xml | 78 -- pom.xml| 2 +- .../common/policies/data/OffloadPoliciesImpl.java | 7 +- 4 files changed, 70 insertions(+), 28 deletions(-)
Re: [PR] [improve][broker] Exclude producers for geo-replication from publishers field of topic stats [pulsar]
lhotari commented on code in PR #22556: URL: https://github.com/apache/pulsar/pull/22556#discussion_r1575054611 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java: ## @@ -745,8 +746,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats replicators.forEach((region, replicator) -> replicator.updateRates()); -nsStats.producerCount += producers.size(); -bundleStats.producerCount += producers.size(); +final AtomicInteger producerCount = new AtomicInteger(); Review Comment: I guess this could be a `org.apache.commons.lang3.mutable.MutableInt` since this is used in single threaded code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][broker] Exclude producers for geo-replication from publishers field of topic stats [pulsar]
lhotari commented on code in PR #22556: URL: https://github.com/apache/pulsar/pull/22556#discussion_r1575054920 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java: ## @@ -2107,8 +2108,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats replicators.forEach((region, replicator) -> replicator.updateRates()); -nsStats.producerCount += producers.size(); -bundleStats.producerCount += producers.size(); +final AtomicInteger producerCount = new AtomicInteger(); Review Comment: I guess this could be a `org.apache.commons.lang3.mutable.MutableInt` since this is used in single threaded code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][misc] Upgrade to Bookkeeper 4.17.0 [pulsar]
merlimat merged PR #22551: URL: https://github.com/apache/pulsar/pull/22551 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [improve][misc] Upgrade to Bookkeeper 4.17.0 (#22551)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new a037fa33eee [improve][misc] Upgrade to Bookkeeper 4.17.0 (#22551) a037fa33eee is described below commit a037fa33a6b0bc052c4aa960a55ca8bd0ca2 Author: Lari Hotari AuthorDate: Mon Apr 22 19:38:11 2024 +0300 [improve][misc] Upgrade to Bookkeeper 4.17.0 (#22551) --- distribution/server/src/assemble/LICENSE.bin.txt | 100 +++ distribution/shell/src/assemble/LICENSE.bin.txt | 8 +- pom.xml | 6 +- 3 files changed, 57 insertions(+), 57 deletions(-) diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index 4dc6e434167..93fd46d44b5 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -262,7 +262,7 @@ The Apache Software License, Version 2.0 - com.fasterxml.jackson.module-jackson-module-parameter-names-2.14.2.jar * Caffeine -- com.github.ben-manes.caffeine-caffeine-2.9.1.jar * Conscrypt -- org.conscrypt-conscrypt-openjdk-uber-2.5.2.jar - * Proto Google Common Protos -- com.google.api.grpc-proto-google-common-protos-2.9.0.jar + * Proto Google Common Protos -- com.google.api.grpc-proto-google-common-protos-2.17.0.jar * Bitbucket -- org.bitbucket.b_c-jose4j-0.9.4.jar * Gson - com.google.code.gson-gson-2.8.9.jar @@ -356,34 +356,34 @@ The Apache Software License, Version 2.0 - net.java.dev.jna-jna-jpms-5.12.1.jar - net.java.dev.jna-jna-platform-jpms-5.12.1.jar * BookKeeper -- org.apache.bookkeeper-bookkeeper-common-4.16.5.jar -- org.apache.bookkeeper-bookkeeper-common-allocator-4.16.5.jar -- org.apache.bookkeeper-bookkeeper-proto-4.16.5.jar -- org.apache.bookkeeper-bookkeeper-server-4.16.5.jar -- org.apache.bookkeeper-bookkeeper-tools-framework-4.16.5.jar -- org.apache.bookkeeper-circe-checksum-4.16.5.jar -- org.apache.bookkeeper-cpu-affinity-4.16.5.jar -- org.apache.bookkeeper-statelib-4.16.5.jar -- org.apache.bookkeeper-stream-storage-api-4.16.5.jar -- org.apache.bookkeeper-stream-storage-common-4.16.5.jar -- org.apache.bookkeeper-stream-storage-java-client-4.16.5.jar -- org.apache.bookkeeper-stream-storage-java-client-base-4.16.5.jar -- org.apache.bookkeeper-stream-storage-proto-4.16.5.jar -- org.apache.bookkeeper-stream-storage-server-4.16.5.jar -- org.apache.bookkeeper-stream-storage-service-api-4.16.5.jar -- org.apache.bookkeeper-stream-storage-service-impl-4.16.5.jar -- org.apache.bookkeeper.http-http-server-4.16.5.jar -- org.apache.bookkeeper.http-vertx-http-server-4.16.5.jar -- org.apache.bookkeeper.stats-bookkeeper-stats-api-4.16.5.jar -- org.apache.bookkeeper.stats-prometheus-metrics-provider-4.16.5.jar -- org.apache.distributedlog-distributedlog-common-4.16.5.jar -- org.apache.distributedlog-distributedlog-core-4.16.5-tests.jar -- org.apache.distributedlog-distributedlog-core-4.16.5.jar -- org.apache.distributedlog-distributedlog-protocol-4.16.5.jar -- org.apache.bookkeeper.stats-codahale-metrics-provider-4.16.5.jar -- org.apache.bookkeeper-bookkeeper-slogger-api-4.16.5.jar -- org.apache.bookkeeper-bookkeeper-slogger-slf4j-4.16.5.jar -- org.apache.bookkeeper-native-io-4.16.5.jar +- org.apache.bookkeeper-bookkeeper-common-4.17.0.jar +- org.apache.bookkeeper-bookkeeper-common-allocator-4.17.0.jar +- org.apache.bookkeeper-bookkeeper-proto-4.17.0.jar +- org.apache.bookkeeper-bookkeeper-server-4.17.0.jar +- org.apache.bookkeeper-bookkeeper-tools-framework-4.17.0.jar +- org.apache.bookkeeper-circe-checksum-4.17.0.jar +- org.apache.bookkeeper-cpu-affinity-4.17.0.jar +- org.apache.bookkeeper-statelib-4.17.0.jar +- org.apache.bookkeeper-stream-storage-api-4.17.0.jar +- org.apache.bookkeeper-stream-storage-common-4.17.0.jar +- org.apache.bookkeeper-stream-storage-java-client-4.17.0.jar +- org.apache.bookkeeper-stream-storage-java-client-base-4.17.0.jar +- org.apache.bookkeeper-stream-storage-proto-4.17.0.jar +- org.apache.bookkeeper-stream-storage-server-4.17.0.jar +- org.apache.bookkeeper-stream-storage-service-api-4.17.0.jar +- org.apache.bookkeeper-stream-storage-service-impl-4.17.0.jar +- org.apache.bookkeeper.http-http-server-4.17.0.jar +- org.apache.bookkeeper.http-vertx-http-server-4.17.0.jar +- org.apache.bookkeeper.stats-bookkeeper-stats-api-4.17.0.jar +- org.apache.bookkeeper.stats-prometheus-metrics-provider-4.17.0.jar +- org.apache.distributedlog-distributedlog-common-4.17.0.jar +- org.apache.distributedlog-distributedlog-core-4.17.0-tests.jar +- org.apache.distributedlog-distributedlog-core-4.17.0.jar +- org.apache.distr
[PR] [improve][cli] Add generate-completion command to admin and client [pulsar]
nodece opened a new pull request, #22557: URL: https://github.com/apache/pulsar/pull/22557 ### Motivation The tab-completion is a important feature, we migrate the CLI parser to picocli from jcommander, right now we can use the picocli autocomplete to do that. - pulsar-admin ``` ❯ source <(bin/pulsar-admin generate-completion) ❯ bin/pulsar-admin topics clear-backlog --subscription --help --subscription --version -h -s -v ``` - pulsar-client ``` ❯ source <(bin/pulsar-client generate-completion) ❯ bin/pulsar-client produce -h --chunking --files--key-value-key --rate -db-h -kvkf -s --disable-batching --help --key-value-key-file --separator-dr-k -m -v --disable-replication --key --messages --value-schema -ekn -ks -n -vs --encryption-key-name --key-schema --num-produce --version -ekv -kvet -p --encryption-key-value --key-value-encoding-type --properties -c -f -kvk -r ``` ### Modifications - Add `generate-completion` command to the `pulsar-admin` and `pulsar-client` ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] PIP-307: Use assigned broker URL hints during broker reconnection [pulsar-client-go]
dragosvictor commented on code in PR #1208: URL: https://github.com/apache/pulsar-client-go/pull/1208#discussion_r1575072107 ## pulsar/producer_partition.go: ## @@ -373,10 +390,14 @@ func (p *partitionProducer) GetBuffer() internal.Buffer { return b } -func (p *partitionProducer) ConnectionClosed() { +func (p *partitionProducer) ConnectionClosed(closeProducer *pb.CommandCloseProducer) { Review Comment: Good catch, fixed! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] PIP-307: Use assigned broker URL hints during broker reconnection [pulsar-client-go]
dragosvictor commented on code in PR #1208: URL: https://github.com/apache/pulsar-client-go/pull/1208#discussion_r1575070621 ## pulsar/internal/pulsar_proto/PulsarApi.pb.go: ## @@ -18,8 +18,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.1 -// protocv3.21.9 +// protoc-gen-go v1.31.0 +// protocv4.24.3 Review Comment: Done! Note that it really only changed the version comments, everything else stayed the same: https://github.com/apache/pulsar-client-go/pull/1208/commits/880d732ed54d774b539d09897f273e24e5b7608f. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][broker] Exclude producers for geo-replication from publishers field of topic stats [pulsar]
massakam commented on code in PR #22556: URL: https://github.com/apache/pulsar/pull/22556#discussion_r1575095343 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java: ## @@ -745,8 +746,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats replicators.forEach((region, replicator) -> replicator.updateRates()); -nsStats.producerCount += producers.size(); -bundleStats.producerCount += producers.size(); +final AtomicInteger producerCount = new AtomicInteger(); Review Comment: Replaced `AtomicInteger` with `MutableInt`. ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java: ## @@ -2107,8 +2108,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats replicators.forEach((region, replicator) -> replicator.updateRates()); -nsStats.producerCount += producers.size(); -bundleStats.producerCount += producers.size(); +final AtomicInteger producerCount = new AtomicInteger(); Review Comment: Replaced `AtomicInteger` with `MutableInt`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][client] Add maxConnectionsPerHost and connectionMaxIdleSeconds to PulsarAdminBuilder [pulsar]
lhotari commented on PR #22541: URL: https://github.com/apache/pulsar/pull/22541#issuecomment-2070268832 the `setMaxConnectionsPerHost` in async http client doesn't seem to behave as expected. Will check the errors ``` Caused by: java.util.concurrent.CompletionException: org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector$RetryException: Could not complete the operation. Number of retries has been exhausted. Failed reason: Too many connections: 16 at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) at java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:674) at java.base/java.util.concurrent.CompletableFuture.orApplyStage(CompletableFuture.java:1601) at java.base/java.util.concurrent.CompletableFuture.applyToEither(CompletableFuture.java:2261) at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.retryOrTimeOut(AsyncHttpConnector.java:275) at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.apply(AsyncHttpConnector.java:234) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] PIP-307: Use assigned broker URL hints during broker reconnection [pulsar-client-go]
dragosvictor commented on code in PR #1208: URL: https://github.com/apache/pulsar-client-go/pull/1208#discussion_r1575129127 ## pulsar/internal/lookup_service.go: ## @@ -30,8 +30,9 @@ import ( // LookupResult encapsulates a struct for lookup a request, containing two parts: LogicalAddr, PhysicalAddr. type LookupResult struct { - LogicalAddr *url.URL - PhysicalAddr *url.URL + IsProxyThroughServiceURL bool Review Comment: You're right, it's never used outside of `lookupService.GetBrokerAddress`, where it is retrieved and consumed at the same time. Removed, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [D] Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer [pulsar]
GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer Hi, i have problems to send a message with a python producer which should be processed by an InfluxDBv2 sink connector. With a java test code it is possible. Java class: ` public class Device { public String measurement; public long timestamp; public Map tags; public Map fields;` Set values: ` Device device = new Device(); device.measurement = "test5"; device.timestamp = Instant.now().toEpochMilli(); device.tags = Maps.newHashMap(); device.tags.put("foo", "bar"); device.fields = Maps.newHashMap(); device.fields.put("temp", 14.8); ` Send message: `MessageId msgID = producer.newMessage() .key("test") .value(device) .send(); ` Any suggestions how to do with Python ? Thanks GitHub link: https://github.com/apache/pulsar/discussions/22558 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [D] Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer [pulsar]
GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer Hi, i have problems to send a message with a python producer which should be processed by an InfluxDBv2 sink connector. With a java test code it is possible. Java class: ` public class Device { public String measurement; public long timestamp; public Map tags; public Map fields;` } Set values: ` Device device = new Device(); device.measurement = "test5"; device.timestamp = Instant.now().toEpochMilli(); device.tags = Maps.newHashMap(); device.tags.put("foo", "bar"); device.fields = Maps.newHashMap(); device.fields.put("temp", 14.8); ` Send message: ` MessageId msgID = producer.newMessage() .key("test") .value(device) .send(); ` Any suggestions how to do with Python ? Thanks GitHub link: https://github.com/apache/pulsar/discussions/22558 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [D] Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer [pulsar]
GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer Hi, i have problems to send a message with a python producer which should be processed by an InfluxDBv2 sink connector. With a java test code it is possible. Java class: `public class Device { public String measurement; public long timestamp; public Map tags; public Map fields; }` Set values: `Device device = new Device(); device.measurement = "test5"; device.timestamp = Instant.now().toEpochMilli(); device.tags = Maps.newHashMap(); device.tags.put("foo", "bar"); device.fields = Maps.newHashMap(); device.fields.put("temp", 14.8);` Send message: ` MessageId msgID = producer.newMessage() .key("test") .value(device) .send(); ` Any suggestions how to do with Python ? Thanks GitHub link: https://github.com/apache/pulsar/discussions/22558 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [D] Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer [pulsar]
GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer Hi, i have problems to send a message with a python producer which should be processed by an InfluxDBv2 sink connector. With a java test code it is possible. Java class: `public` String measurement; public long timestamp; public Map tags; public Map fields;` Set values: `Device device = new Device(); device.measurement = "test5"; device.timestamp = Instant.now().toEpochMilli(); device.tags = Maps.newHashMap(); device.tags.put("foo", "bar"); device.fields = Maps.newHashMap(); device.fields.put("temp", 14.8);` Send message: ` MessageId msgID = producer.newMessage() .key("test") .value(device) .send(); ` Any suggestions how to do with Python ? Thanks GitHub link: https://github.com/apache/pulsar/discussions/22558 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [D] Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer [pulsar]
GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer Hi, i have problems to send a message with a python producer which should be processed by an InfluxDBv2 sink connector. With a java test code it is possible. Java class: `public String measurement; public long timestamp; public Map tags; public Map fields;` Set values: `Device device = new Device(); device.measurement = "test5"; device.timestamp = Instant.now().toEpochMilli(); device.tags = Maps.newHashMap(); device.tags.put("foo", "bar"); device.fields = Maps.newHashMap(); device.fields.put("temp", 14.8);` Send message: ` MessageId msgID = producer.newMessage() .key("test") .value(device) .send(); ` Any suggestions how to do with Python ? Thanks GitHub link: https://github.com/apache/pulsar/discussions/22558 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [D] Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer [pulsar]
GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer Hi, i have problems to send a message with a python producer which should be processed by an InfluxDBv2 sink connector. With a java test code it is possible. Java class: ` public String measurement; public long timestamp; public Map tags; public Map fields;` Set values: `Device device = new Device(); device.measurement = "test5"; device.timestamp = Instant.now().toEpochMilli(); device.tags = Maps.newHashMap(); device.tags.put("foo", "bar"); device.fields = Maps.newHashMap(); device.fields.put("temp", 14.8);` Send message: ` MessageId msgID = producer.newMessage() .key("test") .value(device) .send(); ` Any suggestions how to do with Python ? Thanks GitHub link: https://github.com/apache/pulsar/discussions/22558 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [D] Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer [pulsar]
GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer Hi, i have problems to send a message with a python producer which should be processed by an InfluxDBv2 sink connector. With a java test code it is possible. Java class: public class Device { public String measurement; public long timestamp; public Map tags; public Map fields;` } Set values: Device device = new Device(); device.measurement = "test5"; device.timestamp = Instant.now().toEpochMilli(); device.tags = Maps.newHashMap(); device.tags.put("foo", "bar"); device.fields = Maps.newHashMap(); device.fields.put("temp", 14.8); Send message: MessageId msgID = producer.newMessage() .key("test") .value(device) .send(); Any suggestions how to do with Python ? Thanks GitHub link: https://github.com/apache/pulsar/discussions/22558 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [D] Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer [pulsar]
GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer Hi, i have problems to send a message with a python producer which should be processed by an InfluxDBv2 sink connector. With a java test code it is possible. Java class: public class Device { public String measurement; public long timestamp; public Map tags; public Map fields; } Set values: Device device = new Device(); device.measurement = "test5"; device.timestamp = Instant.now().toEpochMilli(); device.tags = Maps.newHashMap(); device.tags.put("foo", "bar"); device.fields = Maps.newHashMap(); device.fields.put("temp", 14.8); Create producer: Producer producer = client.newProducer(JSONSchema.of(Device.class)) .topic("persistent://public/default/foobar") .producerName("test") .create(); Send message: MessageId msgID = producer.newMessage() .key("test") .value(device) .send(); Any suggestions how to do with Python ? Thanks GitHub link: https://github.com/apache/pulsar/discussions/22558 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [D] Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer [pulsar]
GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer Hi, i have problems to send a message with a python producer which should be processed by an InfluxDBv2 sink connector. With a java test code it is possible. Java class: public class Device { public String measurement; public long timestamp; public Map tags; public Map fields; } Set values: Device device = new Device(); device.measurement = "test5"; device.timestamp = Instant.now().toEpochMilli(); device.tags = Maps.newHashMap(); device.tags.put("foo", "bar"); device.fields = Maps.newHashMap(); device.fields.put("temp", 14.8); Create producer: Producer producer = client.newProducer(JSONSchema.of(Device.class)) .topic("persistent://public/default/foobar") .producerName("test") .create(); Send message: MessageId msgID = producer.newMessage() .key("test") .value(device) .send(); Any suggestions how to do with Python ? Thanks GitHub link: https://github.com/apache/pulsar/discussions/22558 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [D] Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer [pulsar]
GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer Hi, i have problems to send a message with a python producer which should be processed by an InfluxDBv2 sink connector. With a java test code it is possible. Java class: ` public class Device { public String measurement; public long timestamp; public Map tags; public Map fields; }` Set values: Device device = new Device(); device.measurement = "test5"; device.timestamp = Instant.now().toEpochMilli(); device.tags = Maps.newHashMap(); device.tags.put("foo", "bar"); device.fields = Maps.newHashMap(); device.fields.put("temp", 14.8); Create producer: Producer producer = client.newProducer(JSONSchema.of(Device.class)) .topic("persistent://public/default/foobar") .producerName("test") .create(); Send message: MessageId msgID = producer.newMessage() .key("test") .value(device) .send(); Any suggestions how to do with Python ? Thanks GitHub link: https://github.com/apache/pulsar/discussions/22558 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [D] Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer [pulsar]
GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer Hi, i have problems to send a message with a python producer which should be processed by an InfluxDBv2 sink connector. With a java test code it is possible. Java class: ` public class Device { public String measurement; public long timestamp; public Map tags; public Map fields; }` Set values: ` Device device = new Device(); device.measurement = "test5"; device.timestamp = Instant.now().toEpochMilli(); device.tags = Maps.newHashMap(); device.tags.put("foo", "bar"); device.fields = Maps.newHashMap(); device.fields.put("temp", 14.8);` Create producer: Producer producer = client.newProducer(JSONSchema.of(Device.class)) .topic("persistent://public/default/foobar") .producerName("test") .create(); Send message: MessageId msgID = producer.newMessage() .key("test") .value(device) .send(); Any suggestions how to do with Python ? Thanks GitHub link: https://github.com/apache/pulsar/discussions/22558 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [D] Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer [pulsar]
GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer Hi, i have problems to send a message with a python producer which should be processed by an InfluxDBv2 sink connector. With a java test code it is possible. Java class: ` public class Device { public String measurement; public long timestamp; public Map tags; public Map fields; }` Set values: ` Device device = new Device(); device.measurement = "test5"; device.timestamp = Instant.now().toEpochMilli(); device.tags = Maps.newHashMap(); device.tags.put("foo", "bar"); device.fields = Maps.newHashMap(); device.fields.put("temp", 14.8); ` Create producer: Producer producer = client.newProducer(JSONSchema.of(Device.class)) .topic("persistent://public/default/foobar") .producerName("test") .create(); Send message: MessageId msgID = producer.newMessage() .key("test") .value(device) .send(); Any suggestions how to do with Python ? Thanks GitHub link: https://github.com/apache/pulsar/discussions/22558 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [D] Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer [pulsar]
GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer Hi, i have problems to send a message with a python producer which should be processed by an InfluxDBv2 sink connector. With a java test code it is possible. Java class: ` public class Device { public String measurement; public long timestamp; public Map tags; public Map fields; }` Set values: `Device device = new Device(); device.measurement = "test5"; device.timestamp = Instant.now().toEpochMilli(); device.tags = Maps.newHashMap(); device.tags.put("foo", "bar"); device.fields = Maps.newHashMap(); device.fields.put("temp", 14.8);` Create producer: Producer producer = client.newProducer(JSONSchema.of(Device.class)) .topic("persistent://public/default/foobar") .producerName("test") .create(); Send message: MessageId msgID = producer.newMessage() .key("test") .value(device) .send(); Any suggestions how to do with Python ? Thanks GitHub link: https://github.com/apache/pulsar/discussions/22558 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [D] Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer [pulsar]
GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer Hi, i have problems to send a message with a python producer which should be processed by an InfluxDBv2 sink connector. With a java test code it is possible. Java class: ` public class Device { public String measurement; public long timestamp; public Map tags; public Map fields; }` Set values: `Device device = new Device(); device.measurement = "test5"; device.timestamp = Instant.now().toEpochMilli(); device.tags = Maps.newHashMap(); device.tags.put("foo", "bar"); device.fields = Maps.newHashMap(); device.fields.put("temp", 14.8);` Device device = new Device(); device.measurement = "test5"; device.timestamp = Instant.now().toEpochMilli(); device.tags = Maps.newHashMap(); device.tags.put("foo", "bar"); device.fields = Maps.newHashMap(); device.fields.put("temp", 14.8); Create producer: Producer producer = client.newProducer(JSONSchema.of(Device.class)) .topic("persistent://public/default/foobar") .producerName("test") .create(); Send message: MessageId msgID = producer.newMessage() .key("test") .value(device) .send(); Any suggestions how to do with Python ? Thanks GitHub link: https://github.com/apache/pulsar/discussions/22558 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [D] Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer [pulsar]
GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer Hi, i have problems to send a message with a python producer which should be processed by an InfluxDBv2 sink connector. With a java test code it is possible. Java class: ` public class Device { public String measurement; public long timestamp; public Map tags; public Map fields; }` Set values: `Device device = new Device(); device.measurement = "test5"; device.timestamp = Instant.now().toEpochMilli(); device.tags = Maps.newHashMap(); device.tags.put("foo", "bar"); device.fields = Maps.newHashMap(); device.fields.put("temp", 14.8);` Device device = new Device(); device.measurement = "test5"; device.timestamp = Instant.now().toEpochMilli(); device.tags = Maps.newHashMap(); device.tags.put("foo", "bar"); device.fields = Maps.newHashMap(); device.fields.put("temp", 14.8); Create producer: Producer producer = client.newProducer(JSONSchema.of(Device.class)) .topic("persistent://public/default/foobar") .producerName("test") .create(); Send message: MessageId msgID = producer.newMessage() .key("test") .value(device) .send(); Any suggestions how to do with Python ? Thanks GitHub link: https://github.com/apache/pulsar/discussions/22558 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [D] Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer [pulsar]
GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer Hi, i have problems to send a message with a python producer which should be processed by an InfluxDBv2 sink connector. With a java test code it is possible. Java class: public class Device { public String measurement; public long timestamp; public Map tags; public Map fields; } Set values: Device device = new Device(); device.measurement = "test5"; device.timestamp = Instant.now().toEpochMilli(); device.tags = Maps.newHashMap(); device.tags.put("foo", "bar"); device.fields = Maps.newHashMap(); device.fields.put("temp", 14.8); Create producer: Producer producer = client.newProducer(JSONSchema.of(Device.class)) .topic("persistent://public/default/foobar") .producerName("test") .create(); Send message: MessageId msgID = producer.newMessage() .key("test") .value(device) .send(); Any suggestions how to do with Python ? Thanks GitHub link: https://github.com/apache/pulsar/discussions/22558 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [D] Pulsar 3.2.2 correct Message send to InfluxDBv2 sink connector with Python Producer [pulsar]
GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message send to InfluxDBv2 sink connector with Python Producer Hi, i have problems to send a message with a python producer which should be processed by an InfluxDBv2 sink connector. With a java test code it is possible. Java class: public class Device { public String measurement; public long timestamp; public Map tags; public Map fields; } Set values: Device device = new Device(); device.measurement = "test5"; device.timestamp = Instant.now().toEpochMilli(); device.tags = Maps.newHashMap(); device.tags.put("foo", "bar"); device.fields = Maps.newHashMap(); device.fields.put("temp", 14.8); Create producer: Producer producer = client.newProducer(JSONSchema.of(Device.class)) .topic("persistent://public/default/foobar") .producerName("test") .create(); Send message: MessageId msgID = producer.newMessage() .key("test") .value(device) .send(); Any suggestions how to do with Python ? Thanks GitHub link: https://github.com/apache/pulsar/discussions/22558 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [I] Argo CD unable to generate template with pulsar chart 3.3.0 [pulsar-helm-chart]
hpvd commented on issue #471: URL: https://github.com/apache/pulsar-helm-chart/issues/471#issuecomment-2070823328 3.4.0 version of the Apache Pulsar Helm chart is now available: https://github.com/apache/pulsar-helm-chart/releases/tag/pulsar-3.4.0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Bug] broker healthcheck ran into loop after decommissioned a cluster of bookies [pulsar]
wallacepeng commented on issue #22559: URL: https://github.com/apache/pulsar/issues/22559#issuecomment-2070891379 https://github.com/apache/pulsar/assets/894641/f8879642-e3e2-40b1-b063-308e957e693a";> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[I] [Bug] broker healthcheck ran into loop after decommissioned a cluster of bookies [pulsar]
wallacepeng opened a new issue, #22559: URL: https://github.com/apache/pulsar/issues/22559 ### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Read release policy - [X] I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker. ### Version puslar 2.10.5 ### Minimal reproduce step 1. setup two bookkeeper clusters using helm charts bookkeeper and bookkeeper1 2. make bookkeeper as readonly 3. decommission bookkeeper till zero replica (as we are using kubernetes, scale down one node, autorecovery replicates the ledgers) 4. restart brokers. 5. broker ran into loop on health check ### What did you expect to see? broker health check should continue to work ### What did you see instead? broker health check ran into loop ### Anything else? _No response_ ### Are you willing to submit a PR? - [X] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] Update Oxia client to 0.1.6 [pulsar]
merlimat commented on PR #22525: URL: https://github.com/apache/pulsar/pull/22525#issuecomment-2070940582 > ``` > LockManagerTest.acquireLocks:66 » NoClassDefFound Could not initialize class io.streamnative.oxia.proto.ListResponse > ``` This was due to protobuf version conflicts. Solved by merging with master -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] Update Oxia client to 0.1.6 [pulsar]
merlimat merged PR #22525: URL: https://github.com/apache/pulsar/pull/22525 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [improve] Update Oxia client to 0.1.6 (#22525)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new c72c135541e [improve] Update Oxia client to 0.1.6 (#22525) c72c135541e is described below commit c72c135541e14043370836421cfef372b1d0a0ea Author: Matteo Merli AuthorDate: Mon Apr 22 14:15:36 2024 -0700 [improve] Update Oxia client to 0.1.6 (#22525) --- distribution/licenses/LICENSE-Reactive-gRPC.txt | 29 distribution/server/src/assemble/LICENSE.bin.txt | 10 +++- pom.xml | 3 +-- pulsar-metadata/pom.xml | 1 - 4 files changed, 39 insertions(+), 4 deletions(-) diff --git a/distribution/licenses/LICENSE-Reactive-gRPC.txt b/distribution/licenses/LICENSE-Reactive-gRPC.txt new file mode 100644 index 000..bc589401e7b --- /dev/null +++ b/distribution/licenses/LICENSE-Reactive-gRPC.txt @@ -0,0 +1,29 @@ +BSD 3-Clause License + +Copyright (c) 2019, Salesforce.com, Inc. +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +* Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index 93fd46d44b5..c5642503b25 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -481,7 +481,12 @@ The Apache Software License, Version 2.0 * Prometheus - io.prometheus-simpleclient_httpserver-0.16.0.jar * Oxia -- io.streamnative.oxia-oxia-client-0.1.0-shaded.jar +- io.streamnative.oxia-oxia-client-0.1.6.jar +- io.streamnative.oxia-oxia-client-metrics-api-0.1.6.jar + * OpenHFT +- net.openhft-zero-allocation-hashing-0.16.jar + * Project reactor +- io.projectreactor-reactor-core-3.5.2.jar * Java JSON WebTokens - io.jsonwebtoken-jjwt-api-0.11.1.jar - io.jsonwebtoken-jjwt-impl-0.11.1.jar @@ -548,6 +553,9 @@ BSD 3-clause "New" or "Revised" License * JSR305 -- com.google.code.findbugs-jsr305-3.0.2.jar -- ../licenses/LICENSE-JSR305.txt * JLine -- jline-jline-2.14.6.jar -- ../licenses/LICENSE-JLine.txt * JLine3 -- org.jline-jline-3.21.0.jar -- ../licenses/LICENSE-JLine.txt + * Reactive gRPC +- com.salesforce.servicelibs-reactive-grpc-common-1.2.4.jar -- ../licenses/LICENSE-Reactive-gRPC.txt +- com.salesforce.servicelibs-reactor-grpc-stub-1.2.4.jar -- ../licenses/LICENSE-Reactive-gRPC.txt BSD 2-Clause License * HdrHistogram -- org.hdrhistogram-HdrHistogram-2.1.9.jar -- ../licenses/LICENSE-HdrHistogram.txt diff --git a/pom.xml b/pom.xml index 168eddaf2fe..90b6c8cb8ed 100644 --- a/pom.xml +++ b/pom.xml @@ -248,7 +248,7 @@ flexible messaging model and an intuitive client API. 4.5.13 4.4.15 0.7.5 -0.1.0 +0.1.6 2.0 1.10.12 5.3.3 @@ -1193,7 +1193,6 @@ flexible messaging model and an intuitive client API. io.streamnative.oxia oxia-client ${oxia.version} -shaded io.streamnative.oxia diff --git a/pulsar-metadata/pom.xml b/pulsar-metadata/pom.xml index 8600d0ea191..163a3058dc4 100644 --- a/pulsar-metadata/pom.xml +++ b/pulsar-metadata/pom.xml @@ -65,7 +65,6 @@ io.streamnative.oxia oxia-client - shaded
Re: [PR] [feat][broker] PIP-264: Add topic messaging metrics [pulsar]
dragosvictor commented on PR #22467: URL: https://github.com/apache/pulsar/pull/22467#issuecomment-2071044457 > Changes look good. > > My only comment is that we should be using qualified names in the attributes: > > > pulsar.namespace Includes the namespace portion only my-namespace > > pulsar.topic Includes the local topic name my-topic > > eg: `pulsar.namespace` -> `my-tenant/my-namespace` `pulsar.topic` -> `my-tenant/my-namespace/my-topic` > > This is to keep consistency with many other places in APIs and CLI tools. Also, consistent with OTel metrics attributes in the client SDK. Note that such a transformation would lead the namespace to occasionally include the "cluster" portion in case of old topic names: `pulsar.namespace="my-property/use/my-ns"` The topic name, as is currently filled in by the [client](https://github.com/apache/pulsar/blob/c72c135541e14043370836421cfef372b1d0a0ea/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/MetricsUtil.java#L47) also includes the persistence part: `pulsar.topic="persistent://my-property/use/my-ns/testAllCompactedOut-07b9ad7f-89cb-4800-88e8-cb3417cf0406"`. If we can confirm that this is the intent here, I can go ahead and proceed with this proposal. Alternatively, we can augment the existing implementation with one more attribute, say `pulsar.topic.complete.name`, even tough it leads to repetition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [feat][broker] PIP-264: Add topic messaging metrics [pulsar]
merlimat commented on PR #22467: URL: https://github.com/apache/pulsar/pull/22467#issuecomment-2071140865 @dragosvictor I think we shouldn't worry about v1 topic names, these were deprecated long ago we should actually start to get rid of them completely. > The topic name, as is currently filled in by the [client](https://github.com/apache/pulsar/blob/c72c135541e14043370836421cfef372b1d0a0ea/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/MetricsUtil.java#L47) also includes the persistence part: pulsar.topic="persistent://my-property/use/my-ns/testAllCompactedOut-07b9ad7f-89cb-4800-88e8-cb3417cf0406". Yes, it's better to include, because it's part of fully-qualified name. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][broker] Exclude producers for geo-replication from publishers field of topic stats [pulsar]
codecov-commenter commented on PR #22556: URL: https://github.com/apache/pulsar/pull/22556#issuecomment-2071211068 ## [Codecov](https://app.codecov.io/gh/apache/pulsar/pull/22556?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report Attention: Patch coverage is `86.7%` with `2 lines` in your changes are missing coverage. Please review. > Project coverage is 73.91%. Comparing base [(`bbc6224`)](https://app.codecov.io/gh/apache/pulsar/commit/bbc62245c5ddba1de4b1e7cee4ab49334bc36277?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) to head [(`442bb30`)](https://app.codecov.io/gh/apache/pulsar/pull/22556?dropdown=coverage&src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). > Report is 177 commits behind head on master. Additional details and impacted files [![Impacted file tree graph](https://app.codecov.io/gh/apache/pulsar/pull/22556/graphs/tree.svg?width=650&height=150&src=pr&token=acYqCpsK9J&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/22556?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) ```diff @@ Coverage Diff @@ ## master #22556 +/- ## + Coverage 73.57% 73.91% +0.34% - Complexity3262433055 +431 Files 1877 1885 +8 Lines139502 140365 +863 Branches 1529915420 +121 + Hits 102638 103753+1115 + Misses2890828569 -339 - Partials 7956 8043 +87 ``` | [Flag](https://app.codecov.io/gh/apache/pulsar/pull/22556/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | | |---|---|---| | [inttests](https://app.codecov.io/gh/apache/pulsar/pull/22556/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `26.72% <80.00%> (+2.13%)` | :arrow_up: | | [systests](https://app.codecov.io/gh/apache/pulsar/pull/22556/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `24.56% <40.00%> (+0.24%)` | :arrow_up: | | [unittests](https://app.codecov.io/gh/apache/pulsar/pull/22556/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `73.20% <86.66%> (+0.36%)` | :arrow_up: | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more. | [Files](https://app.codecov.io/gh/apache/pulsar/pull/22556?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | | |---|---|---| | [...oker/service/nonpersistent/NonPersistentTopic.java](https://app.codecov.io/gh/apache/pulsar/pull/22556?src=pr&el=tree&filepath=pulsar-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fbroker%2Fservice%2Fnonpersistent%2FNonPersistentTopic.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL25vbnBlcnNpc3RlbnQvTm9uUGVyc2lzdGVudFRvcGljLmphdmE=) | `71.21% <85.71%> (+1.75%)` | :arrow_up: | | [...sar/broker/service/persistent/PersistentTopic.java](https://app.codecov.io/gh/apache/pulsar/pull/22556?src=pr&el=tree&filepath=pulsar-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fbroker%2Fservice%2Fpersistent%2FPersistentTopic.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL3BlcnNpc3RlbnQvUGVyc2lzdGVudFRvcGljLmphdmE=) | `78.36% <87.50%> (-0.10%)` | :arrow_down: | ... and [257 files with indirect coverage changes](https://app.codecov.io/gh/apache/pulsar/pull/22556/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to g
Re: [PR] [improve][broker] Exclude producers for geo-replication from publishers field of topic stats [pulsar]
massakam commented on PR #22556: URL: https://github.com/apache/pulsar/pull/22556#issuecomment-2071211652 @lhotari PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] Part-1: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]
poorbarcode merged PR #21946: URL: https://github.com/apache/pulsar/pull/21946 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [fix] [broker] Part-1: Replicator can not created successfully due to an orphan replicator in the previous topic owner (#21946)
This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 49240522f54 [fix] [broker] Part-1: Replicator can not created successfully due to an orphan replicator in the previous topic owner (#21946) 49240522f54 is described below commit 49240522f543eea0e9307811c92b487eabe431d9 Author: fengyubiao AuthorDate: Tue Apr 23 09:23:08 2024 +0800 [fix] [broker] Part-1: Replicator can not created successfully due to an orphan replicator in the previous topic owner (#21946) --- .../pulsar/broker/service/AbstractReplicator.java | 332 - .../pulsar/broker/service/BrokerService.java | 2 +- .../apache/pulsar/broker/service/Replicator.java | 4 +- .../nonpersistent/NonPersistentReplicator.java | 5 +- .../service/nonpersistent/NonPersistentTopic.java | 10 +- .../service/persistent/PersistentReplicator.java | 87 +++--- .../broker/service/persistent/PersistentTopic.java | 31 +- .../broker/service/AbstractReplicatorTest.java | 22 +- .../broker/service/OneWayReplicatorTest.java | 276 - .../broker/service/OneWayReplicatorTestBase.java | 40 ++- .../pulsar/broker/service/PersistentTopicTest.java | 6 +- .../pulsar/broker/service/ReplicatorTest.java | 11 +- 12 files changed, 656 insertions(+), 170 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 1b5b2824257..f34144deb0a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -18,16 +18,22 @@ */ package org.apache.pulsar.broker.service; +import com.google.common.annotations.VisibleForTesting; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import lombok.Getter; import org.apache.bookkeeper.mledger.Position; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.Backoff; @@ -39,7 +45,7 @@ import org.apache.pulsar.common.util.StringInterner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class AbstractReplicator { +public abstract class AbstractReplicator implements Replicator { protected final BrokerService brokerService; protected final String localTopicName; @@ -64,10 +70,31 @@ public abstract class AbstractReplicator { protected static final AtomicReferenceFieldUpdater STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractReplicator.class, State.class, "state"); -private volatile State state = State.Stopped; - -protected enum State { -Stopped, Starting, Started, Stopping +@VisibleForTesting +@Getter +protected volatile State state = State.Disconnected; + +public enum State { +/** + * This enum has two mean meanings: + * Init: replicator is just created, has not been started now. + * Disconnected: the producer was closed after {@link PersistentTopic#checkGC} called {@link #disconnect}. + */ +// The internal producer is disconnected. +Disconnected, +// Trying to create a new internal producer. +Starting, +// The internal producer has started, and tries copy data. +Started, +/** + * The producer is closing after {@link PersistentTopic#checkGC} called {@link #disconnect}. + */ +// The internal producer is trying to disconnect. +Disconnecting, +// The replicator is in terminating. +Terminating, +// The replicator is never used again. Pulsar will create a new Replicator when enable replication again. +Terminated; } public AbstractReplicator(String localCluster, Topic localTopic, String remoteCluster, String remoteTopicName, @@ -96,16 +123,16 @@ public abstract class AbstractReplicator { .sendTimeout(0, TimeUnit.SECONDS) // .maxPendingMessages(prod
(pulsar-site) branch main updated: Docs sync done from apache/pulsar (#4924052)
This is an automated email from the ASF dual-hosted git repository. urfree pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-site.git The following commit(s) were added to refs/heads/main by this push: new eaf427144d02 Docs sync done from apache/pulsar (#4924052) eaf427144d02 is described below commit eaf427144d02706b666ec3ac822b090f4f1d1227 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Tue Apr 23 01:34:38 2024 + Docs sync done from apache/pulsar (#4924052) --- .../next/config/reference-configuration-broker.md | 35 ++ .../config/reference-configuration-pulsar-proxy.md | 35 ++ .../config/reference-configuration-standalone.md | 35 ++ .../config/reference-configuration-websocket.md| 35 ++ static/swagger/master/swagger.json | 9 ++ static/swagger/master/v2/swagger.json | 9 ++ 6 files changed, 158 insertions(+) diff --git a/static/reference/next/config/reference-configuration-broker.md b/static/reference/next/config/reference-configuration-broker.md index cab9e7f7e7ba..2545e65d0b81 100644 --- a/static/reference/next/config/reference-configuration-broker.md +++ b/static/reference/next/config/reference-configuration-broker.md @@ -3868,6 +3868,29 @@ If enabled the feature that transaction pending ack log batch, this attribute me **Category**: Server +### webServiceHaProxyProtocolEnabled +Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests. Default is false. + +**Type**: `boolean` + +**Default**: `false` + +**Dynamic**: `false` + +**Category**: Server + +### webServiceLogDetailedAddresses +Add detailed client/remote and server/local addresses and ports to http/https request logging. +Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled. + +**Type**: `java.lang.Boolean` + +**Default**: `null` + +**Dynamic**: `false` + +**Category**: Server + ### webServicePort The port for serving http requests @@ -3901,6 +3924,18 @@ Specify the TLS provider for the web service: SunJSSE, Conscrypt and etc. **Category**: Server +### webServiceTrustXForwardedFor +Trust X-Forwarded-For header for resolving the client IP for http/https requests. +Default is false. + +**Type**: `boolean` + +**Default**: `false` + +**Dynamic**: `false` + +**Category**: Server + ### bookkeeperClientAuthenticationParameters Parameters for bookkeeper auth plugin diff --git a/static/reference/next/config/reference-configuration-pulsar-proxy.md b/static/reference/next/config/reference-configuration-pulsar-proxy.md index 655b98e5dc51..f0895b2f888a 100644 --- a/static/reference/next/config/reference-configuration-pulsar-proxy.md +++ b/static/reference/next/config/reference-configuration-pulsar-proxy.md @@ -955,6 +955,29 @@ Path for the file used to determine the rotation status for the proxy instance w **Category**: Server +### webServiceHaProxyProtocolEnabled +Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests. Default is false. + +**Type**: `boolean` + +**Default**: `false` + +**Dynamic**: `false` + +**Category**: Server + +### webServiceLogDetailedAddresses +Add detailed client/remote and server/local addresses and ports to http/https request logging. +Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled. + +**Type**: `java.lang.Boolean` + +**Default**: `null` + +**Dynamic**: `false` + +**Category**: Server + ### webServicePort The port for serving http requests @@ -977,6 +1000,18 @@ The port for serving https requests **Category**: Server +### webServiceTrustXForwardedFor +Trust X-Forwarded-For header for resolving the client IP for http/https requests. +Default is false. + +**Type**: `boolean` + +**Default**: `false` + +**Dynamic**: `false` + +**Category**: Server + ### tlsAllowInsecureConnection Accept untrusted TLS certificate from client. diff --git a/static/reference/next/config/reference-configuration-standalone.md b/static/reference/next/config/reference-configuration-standalone.md index cab9e7f7e7ba..2545e65d0b81 100644 --- a/static/reference/next/config/reference-configuration-standalone.md +++ b/static/reference/next/config/reference-configuration-standalone.md @@ -3868,6 +3868,29 @@ If enabled the feature that transaction pending ack log batch, this attribute me **Category**: Server +### webServiceHaProxyProtocolEnabled +Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests. Default is false. + +**Type**: `boolean` + +**Default**: `false` + +**Dynamic**: `false` + +**Category**: Server + +### webServiceLogDetailedAddresses +Add detailed client/remote and server/local addresses and ports to http/https request loggin
[PR] [improve][ml] Make ManagedLedger read entries parallel [pulsar]
dao-jun opened a new pull request, #22560: URL: https://github.com/apache/pulsar/pull/22560 ### Motivation In https://github.com/apache/pulsar/pull/19035 we introduced `skipCondition` to filter-out delay delivery messages before read entries from Bookkeeper, and in https://github.com/apache/pulsar/pull/21739, we also filter-out deleted(individual acked messages) before read entries. However, it will lead to one situation: one single segment can be spit into segments. For example: entries to be filter-out: [3, 5, 7] entries to read: [1, 10] then, it will be split into: [[1,2],[4], [6], [8,10]]. In the current implementation, after read [1,2] finished, then begin to read [4], after read [4] finished, then start to read [6]... It will lead to latency increasing, memory(allocated for entries) will also be retained for a longer period of time, and affect the throughput of the system. ### Modifications ### Verifying this change - [ ] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] PIP-307: Use assigned broker URL hints during broker reconnection [pulsar-client-go]
BewareMyPower commented on code in PR #1208: URL: https://github.com/apache/pulsar-client-go/pull/1208#discussion_r1575573046 ## pulsar/internal/pulsar_proto/PulsarApi.pb.go: ## @@ -18,8 +18,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.1 -// protocv3.21.9 +// protoc-gen-go v1.31.0 +// protocv4.24.3 Review Comment: Yeah, when I regenerated it locally, I also found only the version comments were changed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][broker] Exclude producers for geo-replication from publishers field of topic stats [pulsar]
massakam commented on PR #22556: URL: https://github.com/apache/pulsar/pull/22556#issuecomment-2071325099 Resolved the conflicts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][fn]make sure the classloader for ContextImpl is `functionClassLoader` in different runtimes [pulsar]
Awsmsniper commented on PR #22501: URL: https://github.com/apache/pulsar/pull/22501#issuecomment-2071364851 > @Awsmsniper it should be another issue, could you please create a new issue first? > > Also, https://hub.docker.com/layers/freeznet/pulsar-all/3.0.5-SNAPSHOT-1c46877/images/sha256-d881a766d57b77ec4b279688b96b714f34eb270496b052eb69362db5c62d8bd3?context=repo is the docker image I built from this PR based on branch-3.0, you may check if this fix solves your issue as well. I did not use the version you provided. After using version 3.0.4, I found that it can be used normally. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [improve] [broker] add role for consumer stat [pulsar]
thetumbled opened a new pull request, #22561: URL: https://github.com/apache/pulsar/pull/22561 ### Motivation By config `allowAutoSubscriptionCreation`, we allow uses to create subscriptions automatically if user have the `consume` permission. We have a strong need to trace back who is the owner of a specific subscription, but currently there is no such way to achieve that. ### Modifications Add field `role` in the stats response. ### Verifying this change - [ ] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation - [ ] `doc` - [ ] `doc-required` - [ ] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] [broker] add role for consumer stat [pulsar]
thetumbled closed pull request #22561: [improve] [broker] add role for consumer stat URL: https://github.com/apache/pulsar/pull/22561 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [improve] [broker] add role for consumer stat [pulsar]
thetumbled opened a new pull request, #22562: URL: https://github.com/apache/pulsar/pull/22562 ### Motivation By config `allowAutoSubscriptionCreation`, we allow uses to create subscriptions automatically if user have the `consume` permission. We have a strong need to trace back who is the owner of a specific subscription, but currently there is no such way to achieve that. ### Modifications Add field `role` in the stats response. ### Verifying this change - [x] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation - [ ] `doc` - [x] `doc-required` - [ ] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org