nbali edited a comment on pull request #15951:
URL: https://github.com/apache/beam/pull/15951#issuecomment-1030289511


   @aromanenko-dev I will move it there if it takes longer, but right now I 
think I'm pretty close to the end, so a closure here might make more sense now.
   
   So... to my own surprise I managed to make it work. I had to trigger the 
"unified worker" path in the `DataflowRunner`, and now with a 
`withStopReadTime` call the streaming job actually stops. _(If the necessity of 
such experiment flags was obvious to everybody and assumed I already have it, 
then mea culpa.)_
   
https://github.com/apache/beam/blob/163ac6a3c10c26898ad89ca8bedde8ef78ee7ee2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L2273-L2279
   
   So after all this what I think the remaining problem is, that if you don't 
use the "unified worker", and you configure your `KafkaIO.read*` with 
configurations that require `KafkaIO.Read.ReadFromKafkaViaSDF` and you launch 
it with an unbounded `PCollection`/as a streaming job, then the extra 
functionality of the SDF will be gone without any warning.
   
https://github.com/apache/beam/blob/163ac6a3c10c26898ad89ca8bedde8ef78ee7ee2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L505-L519
   
   So my idea is to add fail-fast checks into the corresponding 
`PTransformOverrideFactory` - aka `KafkaIO.Read.KAFKA_READ_OVERRIDE`. We have 
the `AppliedPTransform` available, which contains the `ReadFromKafkaViaSDF`, 
which contains the `Read<> kafkaRead`, which contains the custom properties 
(`checkStopReadingFn`, `stopReadTime`, etc - _I'm not sure if there is anything 
else_) that are only being handled by the SDF. So if any of those properties 
are not null, we should fail. Am I on the right path here?
   
https://github.com/apache/beam/blob/163ac6a3c10c26898ad89ca8bedde8ef78ee7ee2/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1339-L1348


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to