alexandrebrilhante opened a new issue, #22543:
URL: https://github.com/apache/pulsar/issues/22543

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Read release policy
   
   - [X] I understand that unsupported versions don't get bug fixes. I will 
attempt to reproduce the issue on a supported version of Pulsar client and 
Pulsar broker.
   
   
   ### Version
   
   OS: macOS Sonoma 14.4.1
   Java: OpenJDK 17.0.11
   Pulsar: 3.2.1
   
   ### Minimal reproduce step
   
   Example detailed 
[here](https://pulsar.apache.org/docs/next/io-quickstart/#connect-pulsar-to-postgresql)
 seems outdated. I've followed every step but still can't see new records in 
PostgreSQL. For comparison, there's seems to be no issue when switching to 
[Cassandra](https://pulsar.apache.org/docs/next/io-quickstart/#connect-pulsar-to-cassandra)
 with the same schema and producer setup. I've tried with both local and 
dockerized Postgres databases.
   
   `pulsar-postgres-jdbc-sink.yaml`
   ```yaml
   configs:
     userName: "postgres"
     password: "postgres"
     jdbcUrl: "jdbc:postgresql://localhost:5432/postgres"
     tableName: "pulsar_postgres_jdbc_sink"
   ```
   
   `schema`
   ```json
   {
     "type": "AVRO",
     "schema": 
"{\"type\":\"record\",\"name\":\"test\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"]},{\"name\":\"name\",\"type\":[\"null\",\"string\"]}]}",
     "properties": {}
   }
   ```
   
   `main.rs` - 127.0.0.1:9999 sends dummy data e.g. `{"id" 1, "name" 
"abcdefg"}` which `main` then sends to Pulsar.
   ```rust
   use pulsar::{producer::ProducerOptions, Pulsar, TokioExecutor};
   use tokio::{io::AsyncReadExt, net::TcpListener, sync::mpsc};
   
   #[tokio::main]
   async fn main() {
       let addr: &str = "pulsar://localhost:6650";
   
       let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor)
           .build()
           .await
           .expect("Failed to connect to Pulsar...");
   
       let topic_name: &str = "persistent://public/default/test_cassandra";
   
       let mut producer: pulsar::Producer<TokioExecutor> = pulsar
           .producer()
           .with_topic(topic_name)
           .with_name("producer")
           .with_options(ProducerOptions {
               batch_size: Some(4),
               ..Default::default()
           })
           .build()
           .await
           .expect("Failed to create producer...");
   
       let (tx, mut rx) = mpsc::channel(100);
   
       let _producer_task: tokio::task::JoinHandle<()> = tokio::spawn(async 
move {
           while let Some(message) = rx.recv().await {
               match producer.send(message).await {
                   Ok(_) => println!("Message sent to Pulsar..."),
                   Err(e) => eprintln!("Failed to send message to Pulsar; err = 
{:?}...", e),
               }
           }
       });
   
       let listener: TcpListener = TcpListener::bind("127.0.0.1:9999")
           .await
           .expect("Failed to bind to address...");
   
       loop {
           let (mut socket, _addr) = listener
               .accept()
               .await
               .expect("Failed to accept connection...");
   
           let tx: mpsc::Sender<String> = tx.clone();
   
           tokio::spawn(async move {
               let mut buf: [u8; 1024] = [0; 1024];
   
               loop {
                   let n: usize = match socket.read(&mut buf).await {
                       Ok(n) if n == 0 => return,
                       Ok(n) => n,
                       Err(e) => {
                           eprintln!("Failed to read from socket; err = 
{:?}...", e);
                           return;
                       }
                   };
   
                   let message = 
String::from_utf8_lossy(&buf[0..n]).to_string();
   
                   if tx.send(message).await.is_err() {
                       eprintln!("Failed to send message to channel...");
                       return;
                   }
               }
           });
       }
   }
   ```
   
   Complete setup:
   ```bash
   bin/pulsar standalone
   
   bin/pulsar-admin schemas upload pulsar-postgres-jdbc-sink-topic -f 
$PWD/pulsar/connectors/schema
   
   bin/pulsar-admin sinks create \
       --archive $PWD/pulsar/connectors/pulsar-io-jdbc-postgres-3.2.2.nar \
       --inputs pulsar-postgres-jdbc-sink-topic \
       --name pulsar-postgres-jdbc-sink \
       --sink-config-file $PWD/pulsar/connectors/pulsar-postgres-jdbc-sink.yaml 
\
       --parallelism 1
   
   cargo build --release && cargo run --release
   ```
   
   ### What did you expect to see?
   
   PostgreSQL table `pulsar_postgres_jdbc_sink` being populated in real-time.
   
   ### What did you see instead?
   
   PostgreSQL table `pulsar_postgres_jdbc_sink` is empty although Pulsar is the 
producing the message properly.
   
   ### Anything else?
   
   No issues when inspecting the sink or the topic. Pulsar is able to produce 
the messages.
   
   ### Are you willing to submit a PR?
   
   - [X] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to