scwhittle commented on code in PR #35942:
URL: https://github.com/apache/beam/pull/35942#discussion_r2297627751


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java:
##########
@@ -155,10 +155,12 @@ private <EncodeT> ByteString encode(Coder<EncodeT> coder, 
EncodeT object) throws
     public long add(WindowedValue<T> data) throws IOException {
       ByteString key, value;
       ByteString id = ByteString.EMPTY;
+      stream.toByteStringAndReset();

Review Comment:
   It seems the issue is likely that an exception is being thrown from line 149 
`coder.encode` which causes line 150 not to reset the stream.  Then if at a 
higher level that exception is caught and ignored and further encoding is 
attempted the checkState will be encountered.
   
   Since this is executed often it seems it would be better to fix by changing 
`encode` to wrap the existing logic with try/catch and resetting the stream 
before propagating an exception thrown by the coder, instead of the additional 
resets here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to