xinyuiscool commented on code in PR #1639:
URL: https://github.com/apache/samza/pull/1639#discussion_r1010901435


##########
samza-test/src/test/java/org/apache/samza/test/drain/DrainHighLevelApiIntegrationTest.java:
##########
@@ -74,20 +74,30 @@ public void describe(StreamApplicationDescriptor 
appDescriptor) {
           .map(KV::getValue)
           .partitionBy(PageView::getMemberId, pv -> pv,
               KVSerde.of(new IntegerSerde(), new 
TestTableData.PageViewJsonSerde()), "p1")
+          .map(kv -> KV.of(kv.getKey() * 31, kv.getValue()))
+          .partitionBy(KV::getKey, KV::getValue, KVSerde.of(new 
IntegerSerde(), new TestTableData.PageViewJsonSerde()), "p2")
           .sink((m, collector, coordinator) -> {
             RECEIVED.add(m.getValue());
           });
     }
   }
 
-  // The test can be occasionally flaky, so we set Ignore annotation
-  // Remove ignore annotation and run the test as follows:
-  // ./gradlew :samza-test:test --tests 
org.apache.samza.test.drain.DrainHighLevelApiIntegrationTest -PscalaSuffix=2.12
+  /**
+   * This test will test drain and consumption of some messages from the 
in-memory topic.
+   * In order to simulate the real-world behaviour of drain, the test adds 
messages to the in-memory topic buffer periodically
+   * in a delayed fashion instead of all at once. The test then writes the 
drain notification message to the in-memory
+   * metadata store to drain and stop the pipeline. This write is done shortly 
after the pipeline starts and before all
+   * the messages are written to the topic's buffer. As a result, the total 
count of the processed messages will be less
+   * than the expected count of messages.
+   * */
   @Ignore
   @Test
-  public void testPipeline() {
+  public void testDrain() {

Review Comment:
   Seems most of the integration tests are ignored due to relying on wall-clock 
time. Is it possible to enable a couple and we don't need to do this timed 
wait? Not sure this TestRunner support waitForFinish(). but samza runners have 
this api so in theory we should be able to use it.



##########
samza-core/src/main/java/org/apache/samza/container/RunLoop.java:
##########
@@ -875,49 +876,40 @@ private boolean shouldDrain() {
         return false;
       }
 
-      if (!pendingEnvelopeQueue.isEmpty()) {
-        PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.peek();
-        IncomingMessageEnvelope envelope = pendingEnvelope.envelope;
+      if (pendingEnvelopeQueue.size() > 0) {
+        final PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.peek();
+        final IncomingMessageEnvelope envelope = pendingEnvelope.envelope;
 
         if (envelope.isDrain()) {
           final DrainMessage message = (DrainMessage) envelope.getMessage();
           if (!message.getRunId().equals(runId)) {
-            // Removing the drain message from the pending queue as it doesn't 
match with the current runId
-            // Removing it will ensure that it is not picked up by process()
-            pendingEnvelopeQueue.remove();
+            // Removing the drain message from the pending queue as it doesn't 
match with the current deployment
+            final PendingEnvelope discardedDrainMessage = 
pendingEnvelopeQueue.remove();
+            
consumerMultiplexer.tryUpdate(discardedDrainMessage.envelope.getSystemStreamPartition());
           } else {
+            // Found drain message matching the current deployment
+
             // set the RunLoop to drain mode
             if (!isDraining) {
               drain();
             }
 
-            if (elasticityFactor <= 1) {
-              SystemStreamPartition ssp = envelope.getSystemStreamPartition();
-              processingSspSetToDrain.remove(ssp);
-            } else {
-              // SystemConsumers will write only one envelope (enclosing 
DrainMessage) per SSP in its buffer.
-              // This envelope doesn't have keybucket info it's SSP. With 
elasticity, the same SSP can be processed by
-              // multiple tasks. Therefore, if envelope contains drain 
message, the ssp of envelope should be removed
-              // from task's processing set irrespective of keyBucket.
-              SystemStreamPartition sspOfEnvelope = 
envelope.getSystemStreamPartition();
-              Optional<SystemStreamPartition> ssp = 
processingSspSetToDrain.stream()
-                  .filter(sspInSet -> 
sspInSet.getSystemStream().equals(sspOfEnvelope.getSystemStream())
-                      && 
sspInSet.getPartition().equals(sspOfEnvelope.getPartition()))
-                  .findFirst();
-              ssp.ifPresent(processingSspSetToDrain::remove);
-            }
 
             if (!hasIntermediateStreams) {
-              // Don't remove from the pending queue as we want the DAG to 
pick up Drain message and propagate it to
-              // intermediate streams
+              // The flow below only applies to samza low-level API
+
+              // For high-level API, we do not remove the message from pending 
queue.
+              // It will be picked by the process flow instead of drain flow, 
as we want the drain control message
+              // to be processed by the High-level API Operator DAG.
+
+              
processingSspSetToDrain.remove(envelope.getSystemStreamPartition());
               pendingEnvelopeQueue.remove();
+              return processingSspSetToDrain.isEmpty();

Review Comment:
   seems we will return this anyway in the end. Do we need to have another 
return here?



##########
samza-core/src/main/java/org/apache/samza/container/RunLoop.java:
##########
@@ -875,49 +876,40 @@ private boolean shouldDrain() {
         return false;
       }
 
-      if (!pendingEnvelopeQueue.isEmpty()) {
-        PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.peek();
-        IncomingMessageEnvelope envelope = pendingEnvelope.envelope;
+      if (pendingEnvelopeQueue.size() > 0) {
+        final PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.peek();
+        final IncomingMessageEnvelope envelope = pendingEnvelope.envelope;
 
         if (envelope.isDrain()) {
           final DrainMessage message = (DrainMessage) envelope.getMessage();
           if (!message.getRunId().equals(runId)) {
-            // Removing the drain message from the pending queue as it doesn't 
match with the current runId
-            // Removing it will ensure that it is not picked up by process()
-            pendingEnvelopeQueue.remove();
+            // Removing the drain message from the pending queue as it doesn't 
match with the current deployment
+            final PendingEnvelope discardedDrainMessage = 
pendingEnvelopeQueue.remove();
+            
consumerMultiplexer.tryUpdate(discardedDrainMessage.envelope.getSystemStreamPartition());
           } else {
+            // Found drain message matching the current deployment
+
             // set the RunLoop to drain mode
             if (!isDraining) {
               drain();
             }
 
-            if (elasticityFactor <= 1) {
-              SystemStreamPartition ssp = envelope.getSystemStreamPartition();
-              processingSspSetToDrain.remove(ssp);
-            } else {
-              // SystemConsumers will write only one envelope (enclosing 
DrainMessage) per SSP in its buffer.
-              // This envelope doesn't have keybucket info it's SSP. With 
elasticity, the same SSP can be processed by
-              // multiple tasks. Therefore, if envelope contains drain 
message, the ssp of envelope should be removed
-              // from task's processing set irrespective of keyBucket.
-              SystemStreamPartition sspOfEnvelope = 
envelope.getSystemStreamPartition();
-              Optional<SystemStreamPartition> ssp = 
processingSspSetToDrain.stream()
-                  .filter(sspInSet -> 
sspInSet.getSystemStream().equals(sspOfEnvelope.getSystemStream())
-                      && 
sspInSet.getPartition().equals(sspOfEnvelope.getPartition()))
-                  .findFirst();
-              ssp.ifPresent(processingSspSetToDrain::remove);
-            }
 
             if (!hasIntermediateStreams) {

Review Comment:
   It's not very safe to use intermediatestreams to decide whether high-level 
or low-level. It's possible that a high-level job has no intermediate streams 
but has states that needs to be drained. 
   
   Do we even support drain on low-level api? I think we don't have such 
support since watermark doesn't mean anything. If we only use drain on 
high-level apis, I think we can safely delete this check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@samza.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to