scwhittle commented on code in PR #37164:
URL: https://github.com/apache/beam/pull/37164#discussion_r2671506653


##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java:
##########
@@ -164,10 +194,20 @@ public Instant getWatermark() {
 
   @Override
   public UnboundedSource.CheckpointMark getCheckpointMark() {
-
-    ImmutableList<BytesXMLMessage> bytesXMLMessages = 
ImmutableList.copyOf(receivedMessages);
+    Queue<BytesXMLMessage> safeToAckMessages = new ConcurrentLinkedQueue<>();
+    safeToAckMessages.addAll(receivedMessages);
     receivedMessages.clear();
-    return new SolaceCheckpointMark(bytesXMLMessages);
+    nackCallback =
+        cleanUpThread.submit(
+            () -> {
+              try {
+                Thread.sleep(ackDeadlineSeconds * 1000L);

Review Comment:
   seems like we should schedule after instead of sleeping on the executor 
thread.  If we have large ackdeadlines such that we have more checkpoints 
during that period than # of threads, the nack callbacks will end up being 
queued and fall behind.
   
   If we decide to ack it woudl also be good to cancel this scheduled nacks.



##########
sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java:
##########
@@ -471,73 +475,13 @@ public void testCheckpointMarkAndFinalizeSeparately() 
throws Exception {
     // No change in the acknowledged messages, because they were acknowledged 
in the #advance()
     // method.
     assertEquals(4, countAckMessages.get());
-  }
-
-  @Test
-  public void testLateCheckpointOverlappingFlushingOfNextBundle() throws 
Exception {
-    AtomicInteger countConsumedMessages = new AtomicInteger(0);
-    AtomicInteger countAckMessages = new AtomicInteger(0);
-
-    // Broker that creates input data
-    SerializableFunction<Integer, BytesXMLMessage> recordFn =
-        index -> {
-          List<BytesXMLMessage> messages = new ArrayList<>();
-          for (int i = 0; i < 10; i++) {
-            messages.add(
-                SolaceDataUtils.getBytesXmlMessage(
-                    "payload_test" + i, "45" + i, (num) -> 
countAckMessages.incrementAndGet()));
-          }
-          countConsumedMessages.incrementAndGet();
-          return getOrNull(index, messages);
-        };
-
-    SessionServiceFactory fakeSessionServiceFactory =
-        
MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(10).build();
-
-    Read<Record> spec =
-        getDefaultRead()
-            .withSessionServiceFactory(fakeSessionServiceFactory)
-            .withMaxNumConnections(4);
-
-    UnboundedSolaceSource<Record> initialSource = getSource(spec, pipeline);
-
-    UnboundedReader<Record> reader =
-        initialSource.createReader(PipelineOptionsFactory.create(), null);
-
-    // start the reader and move to the first record
-    assertTrue(reader.start());
-
-    // consume 3 messages (NB: #start() already consumed the first message)
-    for (int i = 0; i < 3; i++) {
-      assertTrue(String.format("Failed at %d-th message", i), 
reader.advance());
-    }
-
-    // #advance() was called, but the messages were not ready to be 
acknowledged.
-    assertEquals(0, countAckMessages.get());
-
-    // mark all consumed messages as ready to be acknowledged
-    CheckpointMark checkpointMark = reader.getCheckpointMark();
-
-    // data is flushed
-
-    // consume 1 more message.
-    reader.advance();
-    assertEquals(0, countAckMessages.get());
 
-    // consume 1 more message. No change in the acknowledged messages.
-    reader.advance();
-    assertEquals(0, countAckMessages.get());
+    checkpointMark = reader.getCheckpointMark();
 
-    CheckpointMark checkpointMark2 = reader.getCheckpointMark();
-    // data is prepared for flushing that will be rejected
-
-    // acknowledge from the first checkpoint may arrive late
+    Thread.sleep(2000);
     checkpointMark.finalizeCheckpoint();
-
+    // messages were nacked, no chane in expected values

Review Comment:
   spelling



##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java:
##########
@@ -461,6 +462,7 @@ public static Read<Solace.Record> read() {
             .setParseFn(SolaceRecordMapper::map)
             .setTimestampFn(SENDER_TIMESTAMP_FUNCTION)
             .setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS)
+            .setAckDeadlineSeconds(DEFAULT_ACK_DEADLINE_SECONDS)

Review Comment:
   should we set this to a higher default value? If there is fused processing 
that is slow this coudl be exceeded and we'd never ack anything.  Another idea 
would be to measure the latency between pulling and successful finalization 
calls to notice if we're nacking too aggressively.



##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java:
##########
@@ -48,13 +48,14 @@ private SolaceCheckpointMark() {}
    *
    * @param safeToAck - a queue of {@link BytesXMLMessage} to be acknowledged.
    */
-  SolaceCheckpointMark(List<BytesXMLMessage> safeToAck) {
+  SolaceCheckpointMark(Queue<BytesXMLMessage> safeToAck) {
     this.safeToAck = safeToAck;
   }
 
   @Override
   public void finalizeCheckpoint() {
-    for (BytesXMLMessage msg : safeToAck) {
+    BytesXMLMessage msg;
+    while ((msg = safeToAck.poll()) != null) {

Review Comment:
   will the error log below be spammy if things were nacked due to timeout?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to