dlg99 commented on code in PR #18017:
URL: https://github.com/apache/pulsar/pull/18017#discussion_r997545101
##########
pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java:
##########
@@ -213,63 +224,90 @@ protected enum MutationType {
private void flush() {
- // if not in flushing state, do flush, else return;
if (incomingList.size() > 0 && isFlushing.compareAndSet(false, true)) {
- if (log.isDebugEnabled()) {
- log.debug("Starting flush, queue size: {}",
incomingList.size());
- }
- if (!swapList.isEmpty()) {
- throw new IllegalStateException("swapList should be empty
since last flush. swapList.size: "
- + swapList.size());
- }
- synchronized (this) {
- List<Record<T>> tmpList;
- swapList.clear();
+ boolean needAnotherRound;
+ final Deque<Record<T>> swapList = new LinkedList<>();
+
+ synchronized (incomingList) {
+ if (log.isDebugEnabled()) {
+ log.debug("Starting flush, queue size: {}",
incomingList.size());
+ }
+ final int actualBatchSize = batchSize > 0 ?
Math.min(incomingList.size(), batchSize) :
+ incomingList.size();
- tmpList = swapList;
- swapList = incomingList;
- incomingList = tmpList;
+ for (int i = 0; i < actualBatchSize; i++) {
+ swapList.add(incomingList.removeFirst());
+ }
+ needAnotherRound = batchSize > 0 && !incomingList.isEmpty() &&
incomingList.size() >= batchSize;
}
+ long start = System.nanoTime();
int count = 0;
try {
+ PreparedStatement currentBatch = null;
+ final List<Mutation> mutations = swapList
+ .stream()
+ .map(this::createMutation)
+ .collect(Collectors.toList());
// bind each record value
- for (Record<T> record : swapList) {
- final Mutation mutation = createMutation(record);
+ PreparedStatement statement;
+ for (Mutation mutation : mutations) {
switch (mutation.getType()) {
case DELETE:
- bindValue(deleteStatement, mutation);
- count += 1;
- deleteStatement.execute();
+ statement = deleteStatement;
break;
case UPDATE:
- bindValue(updateStatement, mutation);
- count += 1;
- updateStatement.execute();
+ statement = updateStatement;
break;
case INSERT:
- bindValue(insertStatement, mutation);
- count += 1;
- insertStatement.execute();
+ statement = insertStatement;
break;
case UPSERT:
- bindValue(upsertStatement, mutation);
- count += 1;
- upsertStatement.execute();
+ statement = upsertStatement;
break;
default:
String msg = String.format(
"Unsupported action %s, can be one of %s,
or not set which indicate %s",
mutation.getType(),
Arrays.toString(MutationType.values()), MutationType.INSERT);
throw new IllegalArgumentException(msg);
}
+ bindValue(statement, mutation);
+ count += 1;
+ if (jdbcSinkConfig.isUseJdbcBatch()) {
+ if (currentBatch != null && statement != currentBatch)
{
+ executeBatch(swapList, currentBatch);
+ if (log.isDebugEnabled()) {
+ log.debug("Flushed {} messages in {} ms",
count, (System.nanoTime() - start) / 1000 / 1000);
+ }
+ start = System.nanoTime();
+ }
+ statement.addBatch();
+ currentBatch = statement;
+ } else {
+ statement.execute();
+ if (!jdbcSinkConfig.isUseTransactions()) {
+ swapList.removeFirst().ack();
+ }
+ }
}
- if (jdbcSinkConfig.isUseTransactions()) {
- connection.commit();
+
+ if (jdbcSinkConfig.isUseJdbcBatch()) {
+ executeBatch(swapList, currentBatch);
+ if (log.isDebugEnabled()) {
+ log.debug("Flushed {} messages in {} ms", count,
(System.nanoTime() - start) / 1000 / 1000);
+ }
+ } else {
+ if (jdbcSinkConfig.isUseTransactions()) {
+ connection.commit();
+ swapList.forEach(Record::ack);
+ }
Review Comment:
This looks like transactions and batches are mutually exclusive.
transactions and batching are all interwoven here, I'd do two helper methods
like "internalFlushBatch" and "internalFlush" so you can do something like
```java
if (needToBatch) {
internalFlushBatch();
} else {
internalFlush();
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]