Take a look at the WatchGrowthFn[1] and also the in-progress Kafka PR[2]. 1: https://github.com/apache/beam/blob/6612b24ada9382706373db547b5606d6e0496b02/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L787 2: https://github.com/apache/beam/pull/11749
On Tue, Aug 4, 2020 at 3:33 PM Praveen K Viswanathan < [email protected]> wrote: > Thanks for the suggestions Luke. As you know, we are just starting and > should be able to switch to SplittableDoFn, if that's the future of Beam IO > Connectors. The SplittableDoFn page has the design details but it would be > great if we can look into an IO connector built using SplittableDoFn > for reference and to map the design details with actual implementation. > Could you please suggest any such IO for reference. > > I will also parallely try your suggestion in advance() and checkpoint mark > coder to close that issue. > > Thanks, > Praveen > > On Mon, Aug 3, 2020 at 3:28 PM Luke Cwik <[email protected]> wrote: > >> Since you are working on a new connector I would very strongly >> suggest writing it as a splittable DoFn instead of an UnboundedSource. See >> this thread[1] about additional details and some caveats on the >> recommendation. >> >> 1) You can return false from advance and the runner will execute advance >> at some point in time instead of sleeping. This is also the correct thing >> to do if you hit a throttling error. With a splittable DoFn you can return >> a process continuation allowing you to suggest an amount of time to wait >> before being resumed. >> >> 2) It looks like null was returned as the checkpoint mark coder[2]. >> >> 1: >> https://lists.apache.org/thread.html/r76bac40fd22ebf96f379efbaef36fc27c65bdb859f504e19da76ff01%40%3Cdev.beam.apache.org%3E >> 2: >> https://github.com/apache/beam/blob/fa3ca2b11e2ca031232245814389d29c805f79e7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L223 >> >> On Thu, Jul 30, 2020 at 3:41 PM Praveen K Viswanathan < >> [email protected]> wrote: >> >>> Hello Dev team, >>> >>> We are giving our first shot in writing Beam IO connector for Oracle >>> Streaming Service (OSS). The plan is to first implement it for enterprise >>> use and based on the feedback and stability make it available open source. >>> This is our first attempt in developing a Beam IO connector and so far we >>> have progressed with the help of Beam documentation and other related IOs >>> like KafkaIO, KinesisIO. Thanks to the community on that front. >>> >>> Now OSS *has a read limit of 200ms* so when we read the data as shown >>> below in our UnboundedReaders *advance()* method >>> >>> // Get Messages >>> >>> GetMessagesResponse getResponse = >>> this.streamClient.getMessages(getRequest); >>> >>> We are able to read around five message but after that we are getting >>> *request >>> throttling error* >>> >>> Request was throttled because requests limit exhausted, next request can >>> be made in 200 ms >>> >>> We tried with an initial solution of introducing *Thread.sleep(200)* >>> before the getMessages to see how it is behaving and this time we are *able >>> to read around 300+ messages*. With the expertise available in this >>> forum, I would like to hear inputs on two points. >>> >>> 1. >>> >>> How to implement this feature in a proper way rather than just with >>> a one-line Thread.sleep(200) >>> 2. >>> >>> After adding Thread.sleep(200) and reading 300+ messages the >>> pipeline encountered below error. I do not see any implementation >>> specific >>> detail in the stack trace. Can I get an insight what this error could be >>> and how to handle. >>> >>> java.lang.NullPointerException >>> at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream >>> (CoderUtils.java:82) >>> at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray >>> (CoderUtils.java:66) >>> at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray >>> (CoderUtils.java:51) >>> at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:141) >>> at >>> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader >>> (UnboundedReadEvaluatorFactory.java:224) >>> at >>> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement >>> (UnboundedReadEvaluatorFactory.java:132) >>> at >>> org.apache.beam.runners.direct.DirectTransformExecutor.processElements >>> (DirectTransformExecutor.java:160) >>> at org.apache.beam.runners.direct.DirectTransformExecutor.run >>> (DirectTransformExecutor.java:124) >>> at java.util.concurrent.Executors$RunnableAdapter.call >>> (Executors.java:511) >>> at java.util.concurrent.FutureTask.run (FutureTask.java:266) >>> at java.util.concurrent.ThreadPoolExecutor.runWorker >>> (ThreadPoolExecutor.java:1149) >>> at java.util.concurrent.ThreadPoolExecutor$Worker.run >>> (ThreadPoolExecutor.java:624) >>> at java.lang.Thread.run (Thread.java:748) >>> >>> >>> >>> -- >>> Thanks, >>> Praveen K Viswanathan >>> >> > > -- > Thanks, > Praveen K Viswanathan >
