Thanks Luke. I will go through them and come back if I have any questions.

Regards,
Praveen

On Tue, Aug 4, 2020 at 3:55 PM Luke Cwik <[email protected]> wrote:

> Take a look at the WatchGrowthFn[1] and also the in-progress Kafka PR[2].
>
> 1:
> https://github.com/apache/beam/blob/6612b24ada9382706373db547b5606d6e0496b02/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L787
> 2: https://github.com/apache/beam/pull/11749
>
> On Tue, Aug 4, 2020 at 3:33 PM Praveen K Viswanathan <
> [email protected]> wrote:
>
>> Thanks for the suggestions Luke. As you know, we are just starting and
>> should be able to switch to SplittableDoFn, if that's the future of Beam IO
>> Connectors. The SplittableDoFn page has the design details but it would be
>> great if we can look into an IO connector built using SplittableDoFn
>> for reference and to map the design details with actual implementation.
>> Could you please suggest any such IO for reference.
>>
>> I will also parallely try your suggestion in advance() and checkpoint
>> mark coder to close that issue.
>>
>> Thanks,
>> Praveen
>>
>> On Mon, Aug 3, 2020 at 3:28 PM Luke Cwik <[email protected]> wrote:
>>
>>> Since you are working on a new connector I would very strongly
>>> suggest writing it as a splittable DoFn instead of an UnboundedSource. See
>>> this thread[1] about additional details and some caveats on the
>>> recommendation.
>>>
>>> 1) You can return false from advance and the runner will execute advance
>>> at some point in time instead of sleeping. This is also the correct thing
>>> to do if you hit a throttling error. With a splittable DoFn you can return
>>> a process continuation allowing you to suggest an amount of time to wait
>>> before being resumed.
>>>
>>> 2) It looks like null was returned as the checkpoint mark coder[2].
>>>
>>> 1:
>>> https://lists.apache.org/thread.html/r76bac40fd22ebf96f379efbaef36fc27c65bdb859f504e19da76ff01%40%3Cdev.beam.apache.org%3E
>>> 2:
>>> https://github.com/apache/beam/blob/fa3ca2b11e2ca031232245814389d29c805f79e7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L223
>>>
>>> On Thu, Jul 30, 2020 at 3:41 PM Praveen K Viswanathan <
>>> [email protected]> wrote:
>>>
>>>> Hello Dev team,
>>>>
>>>> We are giving our first shot in writing Beam IO connector for Oracle
>>>> Streaming Service (OSS). The plan is to first implement it for enterprise
>>>> use and based on the feedback and stability make it available open source.
>>>> This is our first attempt in developing a Beam IO connector and so far we
>>>> have progressed with the help of Beam documentation and other related IOs
>>>> like KafkaIO, KinesisIO. Thanks to the community on that front.
>>>>
>>>> Now OSS *has a read limit of 200ms* so when we read the data as shown
>>>> below in our UnboundedReaders *advance()* method
>>>>
>>>> // Get Messages
>>>>
>>>> GetMessagesResponse getResponse =
>>>> this.streamClient.getMessages(getRequest);
>>>>
>>>> We are able to read around five message but after that we are getting 
>>>> *request
>>>> throttling error*
>>>>
>>>> Request was throttled because requests limit exhausted, next request
>>>> can be made in 200 ms
>>>>
>>>> We tried with an initial solution of introducing *Thread.sleep(200)*
>>>> before the getMessages to see how it is behaving and this time we are *able
>>>> to read around 300+ messages*. With the expertise available in this
>>>> forum, I would like to hear inputs on two points.
>>>>
>>>>    1.
>>>>
>>>>    How to implement this feature in a proper way rather than just with
>>>>    a one-line Thread.sleep(200)
>>>>    2.
>>>>
>>>>    After adding Thread.sleep(200) and reading 300+ messages the
>>>>    pipeline encountered below error. I do not see any implementation 
>>>> specific
>>>>    detail in the stack trace. Can I get an insight what this error could be
>>>>    and how to handle.
>>>>
>>>>    java.lang.NullPointerException
>>>>        at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream 
>>>> (CoderUtils.java:82)
>>>>        at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray 
>>>> (CoderUtils.java:66)
>>>>        at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray 
>>>> (CoderUtils.java:51)
>>>>        at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:141)
>>>>        at 
>>>> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader
>>>>  (UnboundedReadEvaluatorFactory.java:224)
>>>>        at 
>>>> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement
>>>>  (UnboundedReadEvaluatorFactory.java:132)
>>>>        at 
>>>> org.apache.beam.runners.direct.DirectTransformExecutor.processElements 
>>>> (DirectTransformExecutor.java:160)
>>>>        at org.apache.beam.runners.direct.DirectTransformExecutor.run 
>>>> (DirectTransformExecutor.java:124)
>>>>        at java.util.concurrent.Executors$RunnableAdapter.call 
>>>> (Executors.java:511)
>>>>        at java.util.concurrent.FutureTask.run (FutureTask.java:266)
>>>>        at java.util.concurrent.ThreadPoolExecutor.runWorker 
>>>> (ThreadPoolExecutor.java:1149)
>>>>        at java.util.concurrent.ThreadPoolExecutor$Worker.run 
>>>> (ThreadPoolExecutor.java:624)
>>>>        at java.lang.Thread.run (Thread.java:748)
>>>>
>>>>
>>>>
>>>> --
>>>> Thanks,
>>>> Praveen K Viswanathan
>>>>
>>>
>>
>> --
>> Thanks,
>> Praveen K Viswanathan
>>
>

-- 
Thanks,
Praveen K Viswanathan

Reply via email to