tuteng edited a comment on issue #5454: mysql JDBC Sink - consumer error
URL: https://github.com/apache/pulsar/issues/5454#issuecomment-545688370
Please show your producer's code.
The following is an example, please refer to
```
@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Foo3 {
public String field1;
public String field2;
public int field3;
}
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
AvroSchema schema =
AvroSchema.of(SchemaDefinition.builder().withPojo(Foo3.class).withAlwaysAllowNull(false).build());
Producer producer = client.newProducer(schema)
.topic("test-jdbc")
.create();
for (int i = 0; i < 20; i++) {
String key = "key-" + i;
Foo3 obj = new Foo3();
obj.setField1("field1_insert_" + i);
obj.setField2("field2_insert_" + i);
obj.setField3(i);
Map properties = Maps.newHashMap();
properties.put("EVENT", "INSERT");
producer.newMessage()
.properties(properties)
.key(key)
.value(obj)
.send();
}
for (int i = 0; i < 20; i++) {
String key = "key-" + i;
Foo3 obj = new Foo3();
obj.setField1("field1_insert_" + i);
obj.setField2("field2_update_" + i);
obj.setField3(i);
Map properties = Maps.newHashMap();
properties.put("EVENT", "UPDATE");
producer.newMessage()
.properties(properties)
.key(key)
.value(obj)
.send();
}
for (int i = 0; i < 20; i++) {
String key = "key-" + i;
Foo3 obj = new Foo3();
obj.setField1("field1_insert_" + i);
obj.setField2("field2_delete_" + i);
obj.setField3(i);
Map properties = Maps.newHashMap();
properties.put("EVENT", "DELETE");
producer.newMessage()
.properties(properties)
.key(key)
.value(obj)
.send();
}
producer.close();
client.close();
```
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org
With regards,
Apache Git Services