This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-site.git
The following commit(s) were added to refs/heads/main by this push: new df8ff29a7d92 [improve][doc] add golang transaction demo in txn-use.md (#948) df8ff29a7d92 is described below commit df8ff29a7d92ec539fe626a41b0c9277bbbae1b7 Author: zhou zhuohan <843520...@qq.com> AuthorDate: Mon Aug 12 18:49:51 2024 +0800 [improve][doc] add golang transaction demo in txn-use.md (#948) Co-authored-by: ninjazhou <ninjaz...@tencent.com> --- docs/txn-use.md | 131 +++++++++++++++++++++++++++++++- versioned_docs/version-3.0.x/txn-use.md | 131 +++++++++++++++++++++++++++++++- versioned_docs/version-3.1.x/txn-use.md | 131 +++++++++++++++++++++++++++++++- versioned_docs/version-3.2.x/txn-use.md | 131 +++++++++++++++++++++++++++++++- versioned_docs/version-3.3.x/txn-use.md | 131 +++++++++++++++++++++++++++++++- 5 files changed, 650 insertions(+), 5 deletions(-) diff --git a/docs/txn-use.md b/docs/txn-use.md index 6d91283422a5..193a03de4dbb 100644 --- a/docs/txn-use.md +++ b/docs/txn-use.md @@ -54,7 +54,7 @@ To use Pulsar transaction API, complete the following steps. Transaction coordinator metadata setup success ``` -3. Create a Pulsar client and enable transactions. +3. Create a Pulsar client and enable transactions. Since client need to know transaction coordinator from system topic, please make sure your client role has system namespace `pulsar/system` produce/consume permissions. 4. Create producers and consumers. @@ -80,6 +80,13 @@ To use Pulsar transaction API, complete the following steps. **Input** +````mdx-code-block +<Tabs groupId="api-choice" + defaultValue="Java" + values={[{"label":"Java","value":"Java"},{"label":"Go","value":"Go"}]}> + +<TabItem value="Java"> + ```java PulsarClient client = PulsarClient.builder() // Step 3: create a Pulsar client and enable transactions. @@ -162,6 +169,128 @@ To use Pulsar transaction API, complete the following steps. } ``` +</TabItem> +<TabItem value="Go"> + + ```go + // Step 3: create a Pulsar client and enable transactions. + client, err := pulsar.NewClient(pulsar.ClientOptions{ + URL: "<serviceUrl>", + EnableTransaction: true, + }) + if err != nil { + log.Fatalf("create client fail, err = %v", err) + } + defer client.Close() + // Step 4: create three producers to produce messages to input and output topics. + inputTopic := "inputTopic" + outputTopicOne := "outputTopicOne" + outputTopicTwo := "outputTopicTwo" + subscriptionName := "your-subscription-name" + inputProducer, _ := client.CreateProducer(pulsar.ProducerOptions{ + Topic: inputTopic, + SendTimeout: 0, + }) + defer inputProducer.Close() + outputProducerOne, _ := client.CreateProducer(pulsar.ProducerOptions{ + Topic: outputTopicOne, + SendTimeout: 0, + }) + defer outputProducerOne.Close() + outputProducerTwo, _ := client.CreateProducer(pulsar.ProducerOptions{ + Topic: outputTopicTwo, + SendTimeout: 0, + }) + defer outputProducerTwo.Close() + + // Step 4: create three consumers to consume messages from input and output topics. + inputConsumer, _ := client.Subscribe(pulsar.ConsumerOptions{ + Topic: inputTopic, + SubscriptionName: subscriptionName, + }) + defer inputConsumer.Close() + outputConsumerOne, _ := client.Subscribe(pulsar.ConsumerOptions{ + Topic: outputTopicOne, + SubscriptionName: subscriptionName, + }) + defer outputConsumerOne.Close() + outputConsumerTwo, _ := client.Subscribe(pulsar.ConsumerOptions{ + Topic: outputTopicTwo, + SubscriptionName: subscriptionName, + }) + defer outputConsumerTwo.Close() + + // Step 5: produce messages to input topics. + ctx := context.Background() + count := 2 + for i := 0; i < count; i++ { + inputProducer.Send(ctx, &pulsar.ProducerMessage{ + Payload: []byte(fmt.Sprintf("Hello Pulsar! count : %d", i)), + }) + } + // Step 5: consume messages and produce them to output topics with transactions. + for i := 0; i < count; i++ { + // Step 5: the consumer successfully receives messages. + message, err := inputConsumer.Receive(ctx) + if err != nil { + log.Printf("receive message from %s fail, err = %v", inputTopic, err) + continue + } + // Step 6: create transactions. + // The transaction timeout is specified as 10 seconds. + // If the transaction is not committed within 10 seconds, the transaction is automatically aborted. + txn, err := client.NewTransaction(10 * time.Second) + if err != nil { + log.Printf("create txn fail, err = %v", err) + continue + } + // Step 6: you can process the received message with your use case and business logic. + // processMessage(message) + // Step 7: the producers produce messages to output topics with transactions + _, err = outputProducerOne.Send(context.Background(), &pulsar.ProducerMessage{ + Transaction: txn, + Payload: []byte(fmt.Sprintf("Hello Pulsar! outputTopicOne count : %d", i)), + }) + if err != nil { + log.Printf("send to producerOne fail %v", err) + txn.Abort(ctx) + } + _, err = outputProducerTwo.Send(context.Background(), &pulsar.ProducerMessage{ + Transaction: txn, + Payload: []byte(fmt.Sprintf("Hello Pulsar! outputTopicTwo count : %d", i)), + }) + if err != nil { + log.Printf("send to producerTwo fail %v", err) + txn.Abort(ctx) + } + // Step 7: the consumers acknowledge the input message with the transactions *individually*. + err = inputConsumer.AckWithTxn(message, txn) + if err != nil { + log.Printf("ack message fail %v", err) + txn.Abort(ctx) + } + // Step 8: commit transactions. + err = txn.Commit(ctx) + if err != nil { + log.Printf("commit txn fail %v", err) + } + } + + // Final result: consume messages from output topics and print them. + for i := 0; i < count; i++ { + message, _ := outputConsumerOne.Receive(ctx) + log.Printf("Receive transaction message: %s", string(message.Payload())) + } + for i := 0; i < count; i++ { + message, _ := outputConsumerTwo.Receive(ctx) + log.Printf("Receive transaction message: %s", string(message.Payload())) + } + ``` + +</TabItem> +</Tabs> +```` + **Output** ```java diff --git a/versioned_docs/version-3.0.x/txn-use.md b/versioned_docs/version-3.0.x/txn-use.md index 9b19bc7ef542..6886ea8a83ca 100644 --- a/versioned_docs/version-3.0.x/txn-use.md +++ b/versioned_docs/version-3.0.x/txn-use.md @@ -49,7 +49,7 @@ Currently, [Pulsar transaction API](/api/admin/) is available in **Pulsar 2.8.0 Transaction coordinator metadata setup success ``` -3. Create a Pulsar client and enable transactions. +3. Create a Pulsar client and enable transactions. Since client need to know transaction coordinator from system topic, please make sure your client role has system namespace `pulsar/system` produce/consume permissions. 4. Create producers and consumers. @@ -75,6 +75,13 @@ Currently, [Pulsar transaction API](/api/admin/) is available in **Pulsar 2.8.0 **Input** +````mdx-code-block +<Tabs groupId="api-choice" + defaultValue="Java" + values={[{"label":"Java","value":"Java"},{"label":"Go","value":"Go"}]}> + +<TabItem value="Java"> + ```java PulsarClient client = PulsarClient.builder() // Step 3: create a Pulsar client and enable transactions. @@ -157,6 +164,128 @@ Currently, [Pulsar transaction API](/api/admin/) is available in **Pulsar 2.8.0 } ``` +</TabItem> +<TabItem value="Go"> + + ```go + // Step 3: create a Pulsar client and enable transactions. + client, err := pulsar.NewClient(pulsar.ClientOptions{ + URL: "<serviceUrl>", + EnableTransaction: true, + }) + if err != nil { + log.Fatalf("create client fail, err = %v", err) + } + defer client.Close() + // Step 4: create three producers to produce messages to input and output topics. + inputTopic := "inputTopic" + outputTopicOne := "outputTopicOne" + outputTopicTwo := "outputTopicTwo" + subscriptionName := "your-subscription-name" + inputProducer, _ := client.CreateProducer(pulsar.ProducerOptions{ + Topic: inputTopic, + SendTimeout: 0, + }) + defer inputProducer.Close() + outputProducerOne, _ := client.CreateProducer(pulsar.ProducerOptions{ + Topic: outputTopicOne, + SendTimeout: 0, + }) + defer outputProducerOne.Close() + outputProducerTwo, _ := client.CreateProducer(pulsar.ProducerOptions{ + Topic: outputTopicTwo, + SendTimeout: 0, + }) + defer outputProducerTwo.Close() + + // Step 4: create three consumers to consume messages from input and output topics. + inputConsumer, _ := client.Subscribe(pulsar.ConsumerOptions{ + Topic: inputTopic, + SubscriptionName: subscriptionName, + }) + defer inputConsumer.Close() + outputConsumerOne, _ := client.Subscribe(pulsar.ConsumerOptions{ + Topic: outputTopicOne, + SubscriptionName: subscriptionName, + }) + defer outputConsumerOne.Close() + outputConsumerTwo, _ := client.Subscribe(pulsar.ConsumerOptions{ + Topic: outputTopicTwo, + SubscriptionName: subscriptionName, + }) + defer outputConsumerTwo.Close() + + // Step 5: produce messages to input topics. + ctx := context.Background() + count := 2 + for i := 0; i < count; i++ { + inputProducer.Send(ctx, &pulsar.ProducerMessage{ + Payload: []byte(fmt.Sprintf("Hello Pulsar! count : %d", i)), + }) + } + // Step 5: consume messages and produce them to output topics with transactions. + for i := 0; i < count; i++ { + // Step 5: the consumer successfully receives messages. + message, err := inputConsumer.Receive(ctx) + if err != nil { + log.Printf("receive message from %s fail, err = %v", inputTopic, err) + continue + } + // Step 6: create transactions. + // The transaction timeout is specified as 10 seconds. + // If the transaction is not committed within 10 seconds, the transaction is automatically aborted. + txn, err := client.NewTransaction(10 * time.Second) + if err != nil { + log.Printf("create txn fail, err = %v", err) + continue + } + // Step 6: you can process the received message with your use case and business logic. + // processMessage(message) + // Step 7: the producers produce messages to output topics with transactions + _, err = outputProducerOne.Send(context.Background(), &pulsar.ProducerMessage{ + Transaction: txn, + Payload: []byte(fmt.Sprintf("Hello Pulsar! outputTopicOne count : %d", i)), + }) + if err != nil { + log.Printf("send to producerOne fail %v", err) + txn.Abort(ctx) + } + _, err = outputProducerTwo.Send(context.Background(), &pulsar.ProducerMessage{ + Transaction: txn, + Payload: []byte(fmt.Sprintf("Hello Pulsar! outputTopicTwo count : %d", i)), + }) + if err != nil { + log.Printf("send to producerTwo fail %v", err) + txn.Abort(ctx) + } + // Step 7: the consumers acknowledge the input message with the transactions *individually*. + err = inputConsumer.AckWithTxn(message, txn) + if err != nil { + log.Printf("ack message fail %v", err) + txn.Abort(ctx) + } + // Step 8: commit transactions. + err = txn.Commit(ctx) + if err != nil { + log.Printf("commit txn fail %v", err) + } + } + + // Final result: consume messages from output topics and print them. + for i := 0; i < count; i++ { + message, _ := outputConsumerOne.Receive(ctx) + log.Printf("Receive transaction message: %s", string(message.Payload())) + } + for i := 0; i < count; i++ { + message, _ := outputConsumerTwo.Receive(ctx) + log.Printf("Receive transaction message: %s", string(message.Payload())) + } + ``` + +</TabItem> +</Tabs> +```` + **Output** ```java diff --git a/versioned_docs/version-3.1.x/txn-use.md b/versioned_docs/version-3.1.x/txn-use.md index 9b19bc7ef542..48cc5a840aaa 100644 --- a/versioned_docs/version-3.1.x/txn-use.md +++ b/versioned_docs/version-3.1.x/txn-use.md @@ -49,7 +49,7 @@ Currently, [Pulsar transaction API](/api/admin/) is available in **Pulsar 2.8.0 Transaction coordinator metadata setup success ``` -3. Create a Pulsar client and enable transactions. +3. Create a Pulsar client and enable transactions. Since client need to know transaction coordinator from system topic, please make sure your client role has system namespace `pulsar/system` produce/consume permissions. 4. Create producers and consumers. @@ -75,6 +75,13 @@ Currently, [Pulsar transaction API](/api/admin/) is available in **Pulsar 2.8.0 **Input** +````mdx-code-block +<Tabs groupId="api-choice" + defaultValue="Java" + values={[{"label":"Java","value":"Java"},{"label":"Go","value":"Go"}]}> + +<TabItem value="Java"> + ```java PulsarClient client = PulsarClient.builder() // Step 3: create a Pulsar client and enable transactions. @@ -157,6 +164,128 @@ Currently, [Pulsar transaction API](/api/admin/) is available in **Pulsar 2.8.0 } ``` +</TabItem> +<TabItem value="Go"> + + ```go + // Step 3: create a Pulsar client and enable transactions. + client, err := pulsar.NewClient(pulsar.ClientOptions{ + URL: "<serviceUrl>", + EnableTransaction: true, + }) + if err != nil { + log.Fatalf("create client fail, err = %v", err) + } + defer client.Close() + // Step 4: create three producers to produce messages to input and output topics. + inputTopic := "inputTopic" + outputTopicOne := "outputTopicOne" + outputTopicTwo := "outputTopicTwo" + subscriptionName := "your-subscription-name" + inputProducer, _ := client.CreateProducer(pulsar.ProducerOptions{ + Topic: inputTopic, + SendTimeout: 0, + }) + defer inputProducer.Close() + outputProducerOne, _ := client.CreateProducer(pulsar.ProducerOptions{ + Topic: outputTopicOne, + SendTimeout: 0, + }) + defer outputProducerOne.Close() + outputProducerTwo, _ := client.CreateProducer(pulsar.ProducerOptions{ + Topic: outputTopicTwo, + SendTimeout: 0, + }) + defer outputProducerTwo.Close() + + // Step 4: create three consumers to consume messages from input and output topics. + inputConsumer, _ := client.Subscribe(pulsar.ConsumerOptions{ + Topic: inputTopic, + SubscriptionName: subscriptionName, + }) + defer inputConsumer.Close() + outputConsumerOne, _ := client.Subscribe(pulsar.ConsumerOptions{ + Topic: outputTopicOne, + SubscriptionName: subscriptionName, + }) + defer outputConsumerOne.Close() + outputConsumerTwo, _ := client.Subscribe(pulsar.ConsumerOptions{ + Topic: outputTopicTwo, + SubscriptionName: subscriptionName, + }) + defer outputConsumerTwo.Close() + + // Step 5: produce messages to input topics. + ctx := context.Background() + count := 2 + for i := 0; i < count; i++ { + inputProducer.Send(ctx, &pulsar.ProducerMessage{ + Payload: []byte(fmt.Sprintf("Hello Pulsar! count : %d", i)), + }) + } + // Step 5: consume messages and produce them to output topics with transactions. + for i := 0; i < count; i++ { + // Step 5: the consumer successfully receives messages. + message, err := inputConsumer.Receive(ctx) + if err != nil { + log.Printf("receive message from %s fail, err = %v", inputTopic, err) + continue + } + // Step 6: create transactions. + // The transaction timeout is specified as 10 seconds. + // If the transaction is not committed within 10 seconds, the transaction is automatically aborted. + txn, err := client.NewTransaction(10 * time.Second) + if err != nil { + log.Printf("create txn fail, err = %v", err) + continue + } + // Step 6: you can process the received message with your use case and business logic. + // processMessage(message) + // Step 7: the producers produce messages to output topics with transactions + _, err = outputProducerOne.Send(context.Background(), &pulsar.ProducerMessage{ + Transaction: txn, + Payload: []byte(fmt.Sprintf("Hello Pulsar! outputTopicOne count : %d", i)), + }) + if err != nil { + log.Printf("send to producerOne fail %v", err) + txn.Abort(ctx) + } + _, err = outputProducerTwo.Send(context.Background(), &pulsar.ProducerMessage{ + Transaction: txn, + Payload: []byte(fmt.Sprintf("Hello Pulsar! outputTopicTwo count : %d", i)), + }) + if err != nil { + log.Printf("send to producerTwo fail %v", err) + txn.Abort(ctx) + } + // Step 7: the consumers acknowledge the input message with the transactions *individually*. + err = inputConsumer.AckWithTxn(message, txn) + if err != nil { + log.Printf("ack message fail %v", err) + txn.Abort(ctx) + } + // Step 8: commit transactions. + err = txn.Commit(ctx) + if err != nil { + log.Printf("commit txn fail %v", err) + } + } + + // Final result: consume messages from output topics and print them. + for i := 0; i < count; i++ { + message, _ := outputConsumerOne.Receive(ctx) + log.Printf("Receive transaction message: %s", string(message.Payload())) + } + for i := 0; i < count; i++ { + message, _ := outputConsumerTwo.Receive(ctx) + log.Printf("Receive transaction message: %s", string(message.Payload())) + } + ``` + +</TabItem> +</Tabs> +```` + **Output** ```java diff --git a/versioned_docs/version-3.2.x/txn-use.md b/versioned_docs/version-3.2.x/txn-use.md index 6d91283422a5..193a03de4dbb 100644 --- a/versioned_docs/version-3.2.x/txn-use.md +++ b/versioned_docs/version-3.2.x/txn-use.md @@ -54,7 +54,7 @@ To use Pulsar transaction API, complete the following steps. Transaction coordinator metadata setup success ``` -3. Create a Pulsar client and enable transactions. +3. Create a Pulsar client and enable transactions. Since client need to know transaction coordinator from system topic, please make sure your client role has system namespace `pulsar/system` produce/consume permissions. 4. Create producers and consumers. @@ -80,6 +80,13 @@ To use Pulsar transaction API, complete the following steps. **Input** +````mdx-code-block +<Tabs groupId="api-choice" + defaultValue="Java" + values={[{"label":"Java","value":"Java"},{"label":"Go","value":"Go"}]}> + +<TabItem value="Java"> + ```java PulsarClient client = PulsarClient.builder() // Step 3: create a Pulsar client and enable transactions. @@ -162,6 +169,128 @@ To use Pulsar transaction API, complete the following steps. } ``` +</TabItem> +<TabItem value="Go"> + + ```go + // Step 3: create a Pulsar client and enable transactions. + client, err := pulsar.NewClient(pulsar.ClientOptions{ + URL: "<serviceUrl>", + EnableTransaction: true, + }) + if err != nil { + log.Fatalf("create client fail, err = %v", err) + } + defer client.Close() + // Step 4: create three producers to produce messages to input and output topics. + inputTopic := "inputTopic" + outputTopicOne := "outputTopicOne" + outputTopicTwo := "outputTopicTwo" + subscriptionName := "your-subscription-name" + inputProducer, _ := client.CreateProducer(pulsar.ProducerOptions{ + Topic: inputTopic, + SendTimeout: 0, + }) + defer inputProducer.Close() + outputProducerOne, _ := client.CreateProducer(pulsar.ProducerOptions{ + Topic: outputTopicOne, + SendTimeout: 0, + }) + defer outputProducerOne.Close() + outputProducerTwo, _ := client.CreateProducer(pulsar.ProducerOptions{ + Topic: outputTopicTwo, + SendTimeout: 0, + }) + defer outputProducerTwo.Close() + + // Step 4: create three consumers to consume messages from input and output topics. + inputConsumer, _ := client.Subscribe(pulsar.ConsumerOptions{ + Topic: inputTopic, + SubscriptionName: subscriptionName, + }) + defer inputConsumer.Close() + outputConsumerOne, _ := client.Subscribe(pulsar.ConsumerOptions{ + Topic: outputTopicOne, + SubscriptionName: subscriptionName, + }) + defer outputConsumerOne.Close() + outputConsumerTwo, _ := client.Subscribe(pulsar.ConsumerOptions{ + Topic: outputTopicTwo, + SubscriptionName: subscriptionName, + }) + defer outputConsumerTwo.Close() + + // Step 5: produce messages to input topics. + ctx := context.Background() + count := 2 + for i := 0; i < count; i++ { + inputProducer.Send(ctx, &pulsar.ProducerMessage{ + Payload: []byte(fmt.Sprintf("Hello Pulsar! count : %d", i)), + }) + } + // Step 5: consume messages and produce them to output topics with transactions. + for i := 0; i < count; i++ { + // Step 5: the consumer successfully receives messages. + message, err := inputConsumer.Receive(ctx) + if err != nil { + log.Printf("receive message from %s fail, err = %v", inputTopic, err) + continue + } + // Step 6: create transactions. + // The transaction timeout is specified as 10 seconds. + // If the transaction is not committed within 10 seconds, the transaction is automatically aborted. + txn, err := client.NewTransaction(10 * time.Second) + if err != nil { + log.Printf("create txn fail, err = %v", err) + continue + } + // Step 6: you can process the received message with your use case and business logic. + // processMessage(message) + // Step 7: the producers produce messages to output topics with transactions + _, err = outputProducerOne.Send(context.Background(), &pulsar.ProducerMessage{ + Transaction: txn, + Payload: []byte(fmt.Sprintf("Hello Pulsar! outputTopicOne count : %d", i)), + }) + if err != nil { + log.Printf("send to producerOne fail %v", err) + txn.Abort(ctx) + } + _, err = outputProducerTwo.Send(context.Background(), &pulsar.ProducerMessage{ + Transaction: txn, + Payload: []byte(fmt.Sprintf("Hello Pulsar! outputTopicTwo count : %d", i)), + }) + if err != nil { + log.Printf("send to producerTwo fail %v", err) + txn.Abort(ctx) + } + // Step 7: the consumers acknowledge the input message with the transactions *individually*. + err = inputConsumer.AckWithTxn(message, txn) + if err != nil { + log.Printf("ack message fail %v", err) + txn.Abort(ctx) + } + // Step 8: commit transactions. + err = txn.Commit(ctx) + if err != nil { + log.Printf("commit txn fail %v", err) + } + } + + // Final result: consume messages from output topics and print them. + for i := 0; i < count; i++ { + message, _ := outputConsumerOne.Receive(ctx) + log.Printf("Receive transaction message: %s", string(message.Payload())) + } + for i := 0; i < count; i++ { + message, _ := outputConsumerTwo.Receive(ctx) + log.Printf("Receive transaction message: %s", string(message.Payload())) + } + ``` + +</TabItem> +</Tabs> +```` + **Output** ```java diff --git a/versioned_docs/version-3.3.x/txn-use.md b/versioned_docs/version-3.3.x/txn-use.md index 6d91283422a5..193a03de4dbb 100644 --- a/versioned_docs/version-3.3.x/txn-use.md +++ b/versioned_docs/version-3.3.x/txn-use.md @@ -54,7 +54,7 @@ To use Pulsar transaction API, complete the following steps. Transaction coordinator metadata setup success ``` -3. Create a Pulsar client and enable transactions. +3. Create a Pulsar client and enable transactions. Since client need to know transaction coordinator from system topic, please make sure your client role has system namespace `pulsar/system` produce/consume permissions. 4. Create producers and consumers. @@ -80,6 +80,13 @@ To use Pulsar transaction API, complete the following steps. **Input** +````mdx-code-block +<Tabs groupId="api-choice" + defaultValue="Java" + values={[{"label":"Java","value":"Java"},{"label":"Go","value":"Go"}]}> + +<TabItem value="Java"> + ```java PulsarClient client = PulsarClient.builder() // Step 3: create a Pulsar client and enable transactions. @@ -162,6 +169,128 @@ To use Pulsar transaction API, complete the following steps. } ``` +</TabItem> +<TabItem value="Go"> + + ```go + // Step 3: create a Pulsar client and enable transactions. + client, err := pulsar.NewClient(pulsar.ClientOptions{ + URL: "<serviceUrl>", + EnableTransaction: true, + }) + if err != nil { + log.Fatalf("create client fail, err = %v", err) + } + defer client.Close() + // Step 4: create three producers to produce messages to input and output topics. + inputTopic := "inputTopic" + outputTopicOne := "outputTopicOne" + outputTopicTwo := "outputTopicTwo" + subscriptionName := "your-subscription-name" + inputProducer, _ := client.CreateProducer(pulsar.ProducerOptions{ + Topic: inputTopic, + SendTimeout: 0, + }) + defer inputProducer.Close() + outputProducerOne, _ := client.CreateProducer(pulsar.ProducerOptions{ + Topic: outputTopicOne, + SendTimeout: 0, + }) + defer outputProducerOne.Close() + outputProducerTwo, _ := client.CreateProducer(pulsar.ProducerOptions{ + Topic: outputTopicTwo, + SendTimeout: 0, + }) + defer outputProducerTwo.Close() + + // Step 4: create three consumers to consume messages from input and output topics. + inputConsumer, _ := client.Subscribe(pulsar.ConsumerOptions{ + Topic: inputTopic, + SubscriptionName: subscriptionName, + }) + defer inputConsumer.Close() + outputConsumerOne, _ := client.Subscribe(pulsar.ConsumerOptions{ + Topic: outputTopicOne, + SubscriptionName: subscriptionName, + }) + defer outputConsumerOne.Close() + outputConsumerTwo, _ := client.Subscribe(pulsar.ConsumerOptions{ + Topic: outputTopicTwo, + SubscriptionName: subscriptionName, + }) + defer outputConsumerTwo.Close() + + // Step 5: produce messages to input topics. + ctx := context.Background() + count := 2 + for i := 0; i < count; i++ { + inputProducer.Send(ctx, &pulsar.ProducerMessage{ + Payload: []byte(fmt.Sprintf("Hello Pulsar! count : %d", i)), + }) + } + // Step 5: consume messages and produce them to output topics with transactions. + for i := 0; i < count; i++ { + // Step 5: the consumer successfully receives messages. + message, err := inputConsumer.Receive(ctx) + if err != nil { + log.Printf("receive message from %s fail, err = %v", inputTopic, err) + continue + } + // Step 6: create transactions. + // The transaction timeout is specified as 10 seconds. + // If the transaction is not committed within 10 seconds, the transaction is automatically aborted. + txn, err := client.NewTransaction(10 * time.Second) + if err != nil { + log.Printf("create txn fail, err = %v", err) + continue + } + // Step 6: you can process the received message with your use case and business logic. + // processMessage(message) + // Step 7: the producers produce messages to output topics with transactions + _, err = outputProducerOne.Send(context.Background(), &pulsar.ProducerMessage{ + Transaction: txn, + Payload: []byte(fmt.Sprintf("Hello Pulsar! outputTopicOne count : %d", i)), + }) + if err != nil { + log.Printf("send to producerOne fail %v", err) + txn.Abort(ctx) + } + _, err = outputProducerTwo.Send(context.Background(), &pulsar.ProducerMessage{ + Transaction: txn, + Payload: []byte(fmt.Sprintf("Hello Pulsar! outputTopicTwo count : %d", i)), + }) + if err != nil { + log.Printf("send to producerTwo fail %v", err) + txn.Abort(ctx) + } + // Step 7: the consumers acknowledge the input message with the transactions *individually*. + err = inputConsumer.AckWithTxn(message, txn) + if err != nil { + log.Printf("ack message fail %v", err) + txn.Abort(ctx) + } + // Step 8: commit transactions. + err = txn.Commit(ctx) + if err != nil { + log.Printf("commit txn fail %v", err) + } + } + + // Final result: consume messages from output topics and print them. + for i := 0; i < count; i++ { + message, _ := outputConsumerOne.Receive(ctx) + log.Printf("Receive transaction message: %s", string(message.Payload())) + } + for i := 0; i < count; i++ { + message, _ := outputConsumerTwo.Receive(ctx) + log.Printf("Receive transaction message: %s", string(message.Payload())) + } + ``` + +</TabItem> +</Tabs> +```` + **Output** ```java