[
https://issues.apache.org/jira/browse/GEODE-7971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17095507#comment-17095507
]
ASF GitHub Bot commented on GEODE-7971:
---------------------------------------
albertogpz commented on a change in pull request #4928:
URL: https://github.com/apache/geode/pull/4928#discussion_r417369529
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
##########
@@ -1282,22 +1277,90 @@ public List peek(int batchSize, int timeToWait) throws
InterruptedException, Cac
Thread.currentThread().interrupt();
break;
}
- continue;
}
}
+
+ if (batch.size() > 0) {
+ peekEventsFromIncompleteTransactions(batch,
incompleteTransactionsInBatch, prQ);
+ }
+
if (isDebugEnabled) {
logger.debug("{}: Peeked a batch of {} entries. The size of the queue is
{}. localSize is {}",
this, batch.size(), size(), localSize());
}
+
if (batch.size() == 0) {
blockProcessorThreadIfRequired();
}
return batch;
}
+ private boolean stopPeekingDueToTime(long currentTime, int timeToWait, long
end) {
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ // If time to wait is -1 (don't wait) or time interval has elapsed
+ if (isDebugEnabled) {
+ logger.debug("{}: Peek current time: {}", this, currentTime);
+ }
+ if (timeToWait == -1 || (end <= currentTime)) {
+ if (isDebugEnabled) {
+ logger.debug("{}: Peek breaking", this);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ protected boolean isGroupTransactionEvents() {
+ return sender.isGroupTransactionEvents();
+ }
+
+ private void
peekEventsFromIncompleteTransactions(List<GatewaySenderEventImpl> batch,
+ Map<TransactionId, Integer> incompleteTransactionIdsInBatch,
PartitionedRegion prQ) {
+ if (!isGroupTransactionEvents()) {
+ return;
+ }
+
+ if (areAllTransactionsCompleteInBatch(incompleteTransactionIdsInBatch)) {
+ return;
+ }
+
+ int maxRetries = 2;
Review comment:
I agree. The new commit contains a parameter for this.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Gateway sender to deliver transaction events atomically to gateway receivers
> ----------------------------------------------------------------------------
>
> Key: GEODE-7971
> URL: https://issues.apache.org/jira/browse/GEODE-7971
> Project: Geode
> Issue Type: Improvement
> Components: wan
> Reporter: Alberto Gomez
> Assignee: Alberto Gomez
> Priority: Major
> Time Spent: 50m
> Remaining Estimate: 0h
>
> The goal of this ticket is to implement the necessary changes in the gateway
> sender to prevent that events belonging to the same transaction are spread
> across different batches. In other words, to ensure that events from the same
> transaction are sent inside the same batch.
> This will be an optional feature on gateway senders to be enabled via a new
> parameter (--group-transaction-events) and will be restricted to serial
> gateway senders with just one dispatcher thread or to parallel gateway
> senders.
> Apart from the above restriction, grouping of events for a transaction inside
> the same batch may only be attained if the regions to which the events belong
> are replicated by the same set of gateway senders with the
> --group-transaction-events flag enabled. If this condition is not met, the
> events will be correctly delivered by the gateway senders but it will not be
> guaranteed that all events will always be sent inside the same batch.
> For more details see:
> [https://cwiki.apache.org/confluence/display/GEODE/Gw+sender+to+deliver+transaction+events+atomically+to+receivers]
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)