Hello Dev team,
We are giving our first shot in writing Beam IO connector for Oracle
Streaming Service (OSS). The plan is to first implement it for enterprise
use and based on the feedback and stability make it available open source.
This is our first attempt in developing a Beam IO connector and so far we
have progressed with the help of Beam documentation and other related IOs
like KafkaIO, KinesisIO. Thanks to the community on that front.
Now OSS *has a read limit of 200ms* so when we read the data as shown below
in our UnboundedReaders *advance()* method
// Get Messages
GetMessagesResponse getResponse = this.streamClient.getMessages(getRequest);
We are able to read around five message but after that we are getting *request
throttling error*
Request was throttled because requests limit exhausted, next request can be
made in 200 ms
We tried with an initial solution of introducing *Thread.sleep(200)* before
the getMessages to see how it is behaving and this time we are *able to
read around 300+ messages*. With the expertise available in this forum, I
would like to hear inputs on two points.
1.
How to implement this feature in a proper way rather than just with a
one-line Thread.sleep(200)
2.
After adding Thread.sleep(200) and reading 300+ messages the pipeline
encountered below error. I do not see any implementation specific detail in
the stack trace. Can I get an insight what this error could be and how to
handle.
java.lang.NullPointerException
at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream
(CoderUtils.java:82)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray
(CoderUtils.java:66)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray
(CoderUtils.java:51)
at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:141)
at
org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader
(UnboundedReadEvaluatorFactory.java:224)
at
org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement
(UnboundedReadEvaluatorFactory.java:132)
at org.apache.beam.runners.direct.DirectTransformExecutor.processElements
(DirectTransformExecutor.java:160)
at org.apache.beam.runners.direct.DirectTransformExecutor.run
(DirectTransformExecutor.java:124)
at java.util.concurrent.Executors$RunnableAdapter.call
(Executors.java:511)
at java.util.concurrent.FutureTask.run (FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker
(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run
(ThreadPoolExecutor.java:624)
at java.lang.Thread.run (Thread.java:748)
--
Thanks,
Praveen K Viswanathan