[ https://issues.apache.org/jira/browse/BEAM-8554?focusedWorklogId=339593&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-339593 ]
ASF GitHub Bot logged work on BEAM-8554: ---------------------------------------- Author: ASF GitHub Bot Created on: 06/Nov/19 21:12 Start Date: 06/Nov/19 21:12 Worklog Time Spent: 10m Work Description: scwhittle commented on pull request #10013: [BEAM-8554] Use WorkItemCommitRequest protobuf fields to signal that … URL: https://github.com/apache/beam/pull/10013#discussion_r343328612 ########## File path: runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java ########## @@ -949,63 +949,14 @@ public void testKeyCommitTooLargeException() throws Exception { assertEquals(2, result.size()); assertEquals(makeExpectedOutput(2, 0, "key", "key").build(), result.get(2L)); assertTrue(result.containsKey(1L)); - assertEquals("large_key", result.get(1L).getKey().toStringUtf8()); - assertTrue(result.get(1L).getSerializedSize() > 1000); - // Spam worker updates a few times. - int maxTries = 10; - while (--maxTries > 0) { - worker.reportPeriodicWorkerUpdates(); - Uninterruptibles.sleepUninterruptibly(1000, TimeUnit.MILLISECONDS); - } + WorkItemCommitRequest largeCommit = result.get(1L); + assertEquals("large_key", largeCommit.getKey().toStringUtf8()); - // We should see an exception reported for the large commit but not the small one. - ArgumentCaptor<WorkItemStatus> workItemStatusCaptor = - ArgumentCaptor.forClass(WorkItemStatus.class); - verify(mockWorkUnitClient, atLeast(2)).reportWorkItemStatus(workItemStatusCaptor.capture()); - List<WorkItemStatus> capturedStatuses = workItemStatusCaptor.getAllValues(); - boolean foundErrors = false; - for (WorkItemStatus status : capturedStatuses) { - if (!status.getErrors().isEmpty()) { - assertFalse(foundErrors); - foundErrors = true; - String errorMessage = status.getErrors().get(0).getMessage(); - assertThat(errorMessage, Matchers.containsString("KeyCommitTooLargeException")); - } - } - assertTrue(foundErrors); - } - - @Test - public void testKeyCommitTooLargeException_StreamingEngine() throws Exception { - KvCoder<String, String> kvCoder = KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()); - - List<ParallelInstruction> instructions = - Arrays.asList( - makeSourceInstruction(kvCoder), - makeDoFnInstruction(new LargeCommitFn(), 0, kvCoder), - makeSinkInstruction(kvCoder, 1)); - - FakeWindmillServer server = new FakeWindmillServer(errorCollector); - server.setExpectedExceptionCount(1); - - StreamingDataflowWorkerOptions options = - createTestingPipelineOptions(server, "--experiments=enable_streaming_engine"); - StreamingDataflowWorker worker = makeWorker(instructions, options, true /* publishCounters */); - worker.setMaxWorkItemCommitBytes(1000); - worker.start(); - - server.addWorkToOffer(makeInput(1, 0, "large_key")); - server.addWorkToOffer(makeInput(2, 0, "key")); - server.waitForEmptyWorkQueue(); - - Map<Long, Windmill.WorkItemCommitRequest> result = server.waitForAndGetCommits(1); - - assertEquals(2, result.size()); - assertEquals(makeExpectedOutput(2, 0, "key", "key").build(), result.get(2L)); - assertTrue(result.containsKey(1L)); - assertEquals("large_key", result.get(1L).getKey().toStringUtf8()); - assertTrue(result.get(1L).getSerializedSize() > 1000); + // The large commit should have its flags set marking it for truncation + assertTrue(largeCommit.getExceedsMaxWorkItemCommitBytes()); + assertTrue(largeCommit.getSerializedSize() < 100); Review comment: verify the timers and output messages repeated fields are empty ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 339593) Time Spent: 50m (was: 40m) > Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to > be broken up > ----------------------------------------------------------------------------------------- > > Key: BEAM-8554 > URL: https://issues.apache.org/jira/browse/BEAM-8554 > Project: Beam > Issue Type: Improvement > Components: runner-dataflow > Reporter: Steve Koonce > Priority: Minor > Time Spent: 50m > Remaining Estimate: 0h > > +Background:+ > When a WorkItemCommitRequest is generated that's bigger than the permitted > size (> ~180 MB), a KeyCommitTooLargeException is logged (_not thrown_) and > the request is still sent to the service. The service rejects the commit, > but breaks up input messages that were bundled together and adds them to new, > smaller work items that will later be pulled and re-tried - likely without > generating another commit that is too large. > When a WorkItemCommitRequest is generated that's too large to be sent back to > the service (> 2 GB), a KeyCommitTooLargeException is thrown and nothing is > sent back to the service. > > +Proposed Improvement+ > In both cases, prevent the doomed, large commit item from being sent back to > the service. Instead send flags in the commit request signaling that the > current work item led to a commit that is too large and the work item should > be broken up. -- This message was sent by Atlassian Jira (v8.3.4#803005)