Take a look at the WatchGrowthFn[1] and also the in-progress Kafka PR[2].

1:
https://github.com/apache/beam/blob/6612b24ada9382706373db547b5606d6e0496b02/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L787
2: https://github.com/apache/beam/pull/11749

On Tue, Aug 4, 2020 at 3:33 PM Praveen K Viswanathan <
[email protected]> wrote:

> Thanks for the suggestions Luke. As you know, we are just starting and
> should be able to switch to SplittableDoFn, if that's the future of Beam IO
> Connectors. The SplittableDoFn page has the design details but it would be
> great if we can look into an IO connector built using SplittableDoFn
> for reference and to map the design details with actual implementation.
> Could you please suggest any such IO for reference.
>
> I will also parallely try your suggestion in advance() and checkpoint mark
> coder to close that issue.
>
> Thanks,
> Praveen
>
> On Mon, Aug 3, 2020 at 3:28 PM Luke Cwik <[email protected]> wrote:
>
>> Since you are working on a new connector I would very strongly
>> suggest writing it as a splittable DoFn instead of an UnboundedSource. See
>> this thread[1] about additional details and some caveats on the
>> recommendation.
>>
>> 1) You can return false from advance and the runner will execute advance
>> at some point in time instead of sleeping. This is also the correct thing
>> to do if you hit a throttling error. With a splittable DoFn you can return
>> a process continuation allowing you to suggest an amount of time to wait
>> before being resumed.
>>
>> 2) It looks like null was returned as the checkpoint mark coder[2].
>>
>> 1:
>> https://lists.apache.org/thread.html/r76bac40fd22ebf96f379efbaef36fc27c65bdb859f504e19da76ff01%40%3Cdev.beam.apache.org%3E
>> 2:
>> https://github.com/apache/beam/blob/fa3ca2b11e2ca031232245814389d29c805f79e7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L223
>>
>> On Thu, Jul 30, 2020 at 3:41 PM Praveen K Viswanathan <
>> [email protected]> wrote:
>>
>>> Hello Dev team,
>>>
>>> We are giving our first shot in writing Beam IO connector for Oracle
>>> Streaming Service (OSS). The plan is to first implement it for enterprise
>>> use and based on the feedback and stability make it available open source.
>>> This is our first attempt in developing a Beam IO connector and so far we
>>> have progressed with the help of Beam documentation and other related IOs
>>> like KafkaIO, KinesisIO. Thanks to the community on that front.
>>>
>>> Now OSS *has a read limit of 200ms* so when we read the data as shown
>>> below in our UnboundedReaders *advance()* method
>>>
>>> // Get Messages
>>>
>>> GetMessagesResponse getResponse =
>>> this.streamClient.getMessages(getRequest);
>>>
>>> We are able to read around five message but after that we are getting 
>>> *request
>>> throttling error*
>>>
>>> Request was throttled because requests limit exhausted, next request can
>>> be made in 200 ms
>>>
>>> We tried with an initial solution of introducing *Thread.sleep(200)*
>>> before the getMessages to see how it is behaving and this time we are *able
>>> to read around 300+ messages*. With the expertise available in this
>>> forum, I would like to hear inputs on two points.
>>>
>>>    1.
>>>
>>>    How to implement this feature in a proper way rather than just with
>>>    a one-line Thread.sleep(200)
>>>    2.
>>>
>>>    After adding Thread.sleep(200) and reading 300+ messages the
>>>    pipeline encountered below error. I do not see any implementation 
>>> specific
>>>    detail in the stack trace. Can I get an insight what this error could be
>>>    and how to handle.
>>>
>>>    java.lang.NullPointerException
>>>        at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream 
>>> (CoderUtils.java:82)
>>>        at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray 
>>> (CoderUtils.java:66)
>>>        at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray 
>>> (CoderUtils.java:51)
>>>        at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:141)
>>>        at 
>>> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader
>>>  (UnboundedReadEvaluatorFactory.java:224)
>>>        at 
>>> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement
>>>  (UnboundedReadEvaluatorFactory.java:132)
>>>        at 
>>> org.apache.beam.runners.direct.DirectTransformExecutor.processElements 
>>> (DirectTransformExecutor.java:160)
>>>        at org.apache.beam.runners.direct.DirectTransformExecutor.run 
>>> (DirectTransformExecutor.java:124)
>>>        at java.util.concurrent.Executors$RunnableAdapter.call 
>>> (Executors.java:511)
>>>        at java.util.concurrent.FutureTask.run (FutureTask.java:266)
>>>        at java.util.concurrent.ThreadPoolExecutor.runWorker 
>>> (ThreadPoolExecutor.java:1149)
>>>        at java.util.concurrent.ThreadPoolExecutor$Worker.run 
>>> (ThreadPoolExecutor.java:624)
>>>        at java.lang.Thread.run (Thread.java:748)
>>>
>>>
>>>
>>> --
>>> Thanks,
>>> Praveen K Viswanathan
>>>
>>
>
> --
> Thanks,
> Praveen K Viswanathan
>

Reply via email to