xinyuiscool commented on code in PR #1639: URL: https://github.com/apache/samza/pull/1639#discussion_r1010901435
########## samza-test/src/test/java/org/apache/samza/test/drain/DrainHighLevelApiIntegrationTest.java: ########## @@ -74,20 +74,30 @@ public void describe(StreamApplicationDescriptor appDescriptor) { .map(KV::getValue) .partitionBy(PageView::getMemberId, pv -> pv, KVSerde.of(new IntegerSerde(), new TestTableData.PageViewJsonSerde()), "p1") + .map(kv -> KV.of(kv.getKey() * 31, kv.getValue())) + .partitionBy(KV::getKey, KV::getValue, KVSerde.of(new IntegerSerde(), new TestTableData.PageViewJsonSerde()), "p2") .sink((m, collector, coordinator) -> { RECEIVED.add(m.getValue()); }); } } - // The test can be occasionally flaky, so we set Ignore annotation - // Remove ignore annotation and run the test as follows: - // ./gradlew :samza-test:test --tests org.apache.samza.test.drain.DrainHighLevelApiIntegrationTest -PscalaSuffix=2.12 + /** + * This test will test drain and consumption of some messages from the in-memory topic. + * In order to simulate the real-world behaviour of drain, the test adds messages to the in-memory topic buffer periodically + * in a delayed fashion instead of all at once. The test then writes the drain notification message to the in-memory + * metadata store to drain and stop the pipeline. This write is done shortly after the pipeline starts and before all + * the messages are written to the topic's buffer. As a result, the total count of the processed messages will be less + * than the expected count of messages. + * */ @Ignore @Test - public void testPipeline() { + public void testDrain() { Review Comment: Seems most of the integration tests are ignored due to relying on wall-clock time. Is it possible to enable a couple and we don't need to do this timed wait? Not sure this TestRunner support waitForFinish(). but samza runners have this api so in theory we should be able to use it. ########## samza-core/src/main/java/org/apache/samza/container/RunLoop.java: ########## @@ -875,49 +876,40 @@ private boolean shouldDrain() { return false; } - if (!pendingEnvelopeQueue.isEmpty()) { - PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.peek(); - IncomingMessageEnvelope envelope = pendingEnvelope.envelope; + if (pendingEnvelopeQueue.size() > 0) { + final PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.peek(); + final IncomingMessageEnvelope envelope = pendingEnvelope.envelope; if (envelope.isDrain()) { final DrainMessage message = (DrainMessage) envelope.getMessage(); if (!message.getRunId().equals(runId)) { - // Removing the drain message from the pending queue as it doesn't match with the current runId - // Removing it will ensure that it is not picked up by process() - pendingEnvelopeQueue.remove(); + // Removing the drain message from the pending queue as it doesn't match with the current deployment + final PendingEnvelope discardedDrainMessage = pendingEnvelopeQueue.remove(); + consumerMultiplexer.tryUpdate(discardedDrainMessage.envelope.getSystemStreamPartition()); } else { + // Found drain message matching the current deployment + // set the RunLoop to drain mode if (!isDraining) { drain(); } - if (elasticityFactor <= 1) { - SystemStreamPartition ssp = envelope.getSystemStreamPartition(); - processingSspSetToDrain.remove(ssp); - } else { - // SystemConsumers will write only one envelope (enclosing DrainMessage) per SSP in its buffer. - // This envelope doesn't have keybucket info it's SSP. With elasticity, the same SSP can be processed by - // multiple tasks. Therefore, if envelope contains drain message, the ssp of envelope should be removed - // from task's processing set irrespective of keyBucket. - SystemStreamPartition sspOfEnvelope = envelope.getSystemStreamPartition(); - Optional<SystemStreamPartition> ssp = processingSspSetToDrain.stream() - .filter(sspInSet -> sspInSet.getSystemStream().equals(sspOfEnvelope.getSystemStream()) - && sspInSet.getPartition().equals(sspOfEnvelope.getPartition())) - .findFirst(); - ssp.ifPresent(processingSspSetToDrain::remove); - } if (!hasIntermediateStreams) { - // Don't remove from the pending queue as we want the DAG to pick up Drain message and propagate it to - // intermediate streams + // The flow below only applies to samza low-level API + + // For high-level API, we do not remove the message from pending queue. + // It will be picked by the process flow instead of drain flow, as we want the drain control message + // to be processed by the High-level API Operator DAG. + + processingSspSetToDrain.remove(envelope.getSystemStreamPartition()); pendingEnvelopeQueue.remove(); + return processingSspSetToDrain.isEmpty(); Review Comment: seems we will return this anyway in the end. Do we need to have another return here? ########## samza-core/src/main/java/org/apache/samza/container/RunLoop.java: ########## @@ -875,49 +876,40 @@ private boolean shouldDrain() { return false; } - if (!pendingEnvelopeQueue.isEmpty()) { - PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.peek(); - IncomingMessageEnvelope envelope = pendingEnvelope.envelope; + if (pendingEnvelopeQueue.size() > 0) { + final PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.peek(); + final IncomingMessageEnvelope envelope = pendingEnvelope.envelope; if (envelope.isDrain()) { final DrainMessage message = (DrainMessage) envelope.getMessage(); if (!message.getRunId().equals(runId)) { - // Removing the drain message from the pending queue as it doesn't match with the current runId - // Removing it will ensure that it is not picked up by process() - pendingEnvelopeQueue.remove(); + // Removing the drain message from the pending queue as it doesn't match with the current deployment + final PendingEnvelope discardedDrainMessage = pendingEnvelopeQueue.remove(); + consumerMultiplexer.tryUpdate(discardedDrainMessage.envelope.getSystemStreamPartition()); } else { + // Found drain message matching the current deployment + // set the RunLoop to drain mode if (!isDraining) { drain(); } - if (elasticityFactor <= 1) { - SystemStreamPartition ssp = envelope.getSystemStreamPartition(); - processingSspSetToDrain.remove(ssp); - } else { - // SystemConsumers will write only one envelope (enclosing DrainMessage) per SSP in its buffer. - // This envelope doesn't have keybucket info it's SSP. With elasticity, the same SSP can be processed by - // multiple tasks. Therefore, if envelope contains drain message, the ssp of envelope should be removed - // from task's processing set irrespective of keyBucket. - SystemStreamPartition sspOfEnvelope = envelope.getSystemStreamPartition(); - Optional<SystemStreamPartition> ssp = processingSspSetToDrain.stream() - .filter(sspInSet -> sspInSet.getSystemStream().equals(sspOfEnvelope.getSystemStream()) - && sspInSet.getPartition().equals(sspOfEnvelope.getPartition())) - .findFirst(); - ssp.ifPresent(processingSspSetToDrain::remove); - } if (!hasIntermediateStreams) { Review Comment: It's not very safe to use intermediatestreams to decide whether high-level or low-level. It's possible that a high-level job has no intermediate streams but has states that needs to be drained. Do we even support drain on low-level api? I think we don't have such support since watermark doesn't mean anything. If we only use drain on high-level apis, I think we can safely delete this check. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@samza.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org