Hello Dev team,

We are giving our first shot in writing Beam IO connector for Oracle
Streaming Service (OSS). The plan is to first implement it for enterprise
use and based on the feedback and stability make it available open source.
This is our first attempt in developing a Beam IO connector and so far we
have progressed with the help of Beam documentation and other related IOs
like KafkaIO, KinesisIO. Thanks to the community on that front.

Now OSS *has a read limit of 200ms* so when we read the data as shown below
in our UnboundedReaders *advance()* method

// Get Messages

GetMessagesResponse getResponse = this.streamClient.getMessages(getRequest);

We are able to read around five message but after that we are getting *request
throttling error*

Request was throttled because requests limit exhausted, next request can be
made in 200 ms

We tried with an initial solution of introducing *Thread.sleep(200)* before
the getMessages to see how it is behaving and this time we are *able to
read around 300+ messages*. With the expertise available in this forum, I
would like to hear inputs on two points.

   1.

   How to implement this feature in a proper way rather than just with a
   one-line Thread.sleep(200)
   2.

   After adding Thread.sleep(200) and reading 300+ messages the pipeline
   encountered below error. I do not see any implementation specific detail in
   the stack trace. Can I get an insight what this error could be and how to
   handle.

   java.lang.NullPointerException
       at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream
(CoderUtils.java:82)
       at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray
(CoderUtils.java:66)
       at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray
(CoderUtils.java:51)
       at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:141)
       at 
org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader
(UnboundedReadEvaluatorFactory.java:224)
       at 
org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement
(UnboundedReadEvaluatorFactory.java:132)
       at org.apache.beam.runners.direct.DirectTransformExecutor.processElements
(DirectTransformExecutor.java:160)
       at org.apache.beam.runners.direct.DirectTransformExecutor.run
(DirectTransformExecutor.java:124)
       at java.util.concurrent.Executors$RunnableAdapter.call
(Executors.java:511)
       at java.util.concurrent.FutureTask.run (FutureTask.java:266)
       at java.util.concurrent.ThreadPoolExecutor.runWorker
(ThreadPoolExecutor.java:1149)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run
(ThreadPoolExecutor.java:624)
       at java.lang.Thread.run (Thread.java:748)



-- 
Thanks,
Praveen K Viswanathan

Reply via email to