nbali commented on pull request #15951: URL: https://github.com/apache/beam/pull/15951#issuecomment-1030289511
@aromanenko-dev I will move it there if it takes longer, but right now I think I'm pretty close to the end, so a closure here might make more sense now. So... to my own surprise I managed to make it work. I had to trigger the "unified worker" path in the `DataflowRunner`, and now with a `withStopReadTime` call the streaming job actually stops. _(If the necessity of such experiment flags was obvious to everybody and assumed I already have it, then mea culpa.)_ https://github.com/apache/beam/blob/163ac6a3c10c26898ad89ca8bedde8ef78ee7ee2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L2273-L2279 So after all this what I think the remaining problem is, that if you don't use the "unified worker", and you configure your `KafkaIO.read*` with configurations that require `KafkaIO.Read.ReadFromKafkaViaSDF` and you launch it with an unbounded `PCollection`/as a streaming job, then the extra functionality of the SDF will be gone without any warning. https://github.com/apache/beam/blob/163ac6a3c10c26898ad89ca8bedde8ef78ee7ee2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L505-L519 So my idea is to add fail-fast checks into the corresponding `PTransformOverrideFactory`. We have the `AppliedPTransform` available, which contains the `ReadFromKafkaViaSDF`, which contains the `Read<> kafkaRead`, which contains the custom properties (`checkStopReadingFn`, `stopReadTime`, etc - _I'm not sure if there is anything else_) that are only being handled by the SDF. So if any of those properties are not null, we should fail. Am I on the right path here? https://github.com/apache/beam/blob/163ac6a3c10c26898ad89ca8bedde8ef78ee7ee2/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1339-L1348 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
