Since you are working on a new connector I would very strongly suggest writing it as a splittable DoFn instead of an UnboundedSource. See this thread[1] about additional details and some caveats on the recommendation.
1) You can return false from advance and the runner will execute advance at some point in time instead of sleeping. This is also the correct thing to do if you hit a throttling error. With a splittable DoFn you can return a process continuation allowing you to suggest an amount of time to wait before being resumed. 2) It looks like null was returned as the checkpoint mark coder[2]. 1: https://lists.apache.org/thread.html/r76bac40fd22ebf96f379efbaef36fc27c65bdb859f504e19da76ff01%40%3Cdev.beam.apache.org%3E 2: https://github.com/apache/beam/blob/fa3ca2b11e2ca031232245814389d29c805f79e7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L223 On Thu, Jul 30, 2020 at 3:41 PM Praveen K Viswanathan < [email protected]> wrote: > Hello Dev team, > > We are giving our first shot in writing Beam IO connector for Oracle > Streaming Service (OSS). The plan is to first implement it for enterprise > use and based on the feedback and stability make it available open source. > This is our first attempt in developing a Beam IO connector and so far we > have progressed with the help of Beam documentation and other related IOs > like KafkaIO, KinesisIO. Thanks to the community on that front. > > Now OSS *has a read limit of 200ms* so when we read the data as shown > below in our UnboundedReaders *advance()* method > > // Get Messages > > GetMessagesResponse getResponse = > this.streamClient.getMessages(getRequest); > > We are able to read around five message but after that we are getting *request > throttling error* > > Request was throttled because requests limit exhausted, next request can > be made in 200 ms > > We tried with an initial solution of introducing *Thread.sleep(200)* > before the getMessages to see how it is behaving and this time we are *able > to read around 300+ messages*. With the expertise available in this > forum, I would like to hear inputs on two points. > > 1. > > How to implement this feature in a proper way rather than just with a > one-line Thread.sleep(200) > 2. > > After adding Thread.sleep(200) and reading 300+ messages the pipeline > encountered below error. I do not see any implementation specific detail in > the stack trace. Can I get an insight what this error could be and how to > handle. > > java.lang.NullPointerException > at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream > (CoderUtils.java:82) > at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray > (CoderUtils.java:66) > at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray > (CoderUtils.java:51) > at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:141) > at > org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader > (UnboundedReadEvaluatorFactory.java:224) > at > org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement > (UnboundedReadEvaluatorFactory.java:132) > at > org.apache.beam.runners.direct.DirectTransformExecutor.processElements > (DirectTransformExecutor.java:160) > at org.apache.beam.runners.direct.DirectTransformExecutor.run > (DirectTransformExecutor.java:124) > at java.util.concurrent.Executors$RunnableAdapter.call > (Executors.java:511) > at java.util.concurrent.FutureTask.run (FutureTask.java:266) > at java.util.concurrent.ThreadPoolExecutor.runWorker > (ThreadPoolExecutor.java:1149) > at java.util.concurrent.ThreadPoolExecutor$Worker.run > (ThreadPoolExecutor.java:624) > at java.lang.Thread.run (Thread.java:748) > > > > -- > Thanks, > Praveen K Viswanathan >
