Re: Unbounded sources unable to recover from checkpointMark when withMaxReadTime() is used

2020-07-28 Thread Maximilian Michels

Hi Mani,

The implementation of BoundedReadFromUnboundedSource currently doesn't 
allow that in the same way sources checkpoint. We would either have to 
convert it into a proper source (it's a DoFn atm), or store the 
checkpoint mark in Beam managed state (which will be checkpointed). The 
latter is probably easier.


-Max

On 27.07.20 21:43, Sunny, Mani Kolbe wrote:

Thank you Max. Is there a reference code somewhere to implement checkpointing 
for BoundedReadFromUnboundedSource ? I couldn’t quiet figure out where to get 
restored checkpointMark. It appears to be specific to runner implementations.

-Original Message-
From: Maximilian Michels 
Sent: Monday, July 27, 2020 3:04 PM
To: user@beam.apache.org; Sunny, Mani Kolbe 
Subject: Re: Unbounded sources unable to recover from checkpointMark when 
withMaxReadTime() is used

CAUTION: This email originated from outside of D Please do not click links 
or open attachments unless you recognize the sender and know the content is safe.


Hi Mani,

Just to let you know I think it would make sense to either

(1) implement checkpointing for BoundedReadFromUnboundedSource or
(2) throw an error in case of a provided checkpoint mark

Like you pointed out, ignoring it like we currently do, does not seem like a 
feasible solution.

-Max

On 21.07.20 16:16, Sunny, Mani Kolbe wrote:

Hi Alexey,

I did explore that route. The problem there is to identify what
timestamp to use.  While processing records, you can capture the
timestamp. This cannot be processing time, but event time on the
Kinesis record. As far I see, event time on Kinesis record is
generated from ApproximateArrivalTimestamp field on getRecords API
call. According to Kinesis doc[1]: There are no guarantees about the
time stamp accuracy, or that the time stamp is always increasing. For
example, records in a shard or across a stream might have time stamps that are 
out of order.

So there is a chance to skip records if timestamps captured on
checkpoint were out of order.  Also there is no provision to provide
per shard timestamp while initializing KinesisReader.

[1]
https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs
.aws.amazon.com%2Fkinesis%2Flatest%2FAPIReference%2FAPI_GetRecords.htm
ldata=02%7C01%7CSunnyM%40DNB.com%7Cad634faa85b34571267c08d83235fb
72%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637314554717493577
;sdata=2OgxvoDrOfz73pVlLHMJsr946nhhdjDA67Kt3jrSruE%3Dreserved=0

Regards,

Mani

*From:* Alexey Romanenko 
*Sent:* Tuesday, July 21, 2020 2:01 PM
*To:* user@beam.apache.org
*Subject:* Re: Unbounded sources unable to recover from checkpointMark
when withMaxReadTime() is used

*CAUTION:*This email originated from outside of D Please do not
click links or open attachments unless you recognize the sender and
know the content is safe.

Hi Mani,

Knowing when you last run was stopped (since you use batch mode),
could you leverage “withInitialPositionInStream()” or
“withInitialTimestampInStream()” for KinesisIO in this case?

Alexey



 On 21 Jul 2020, at 13:40, Sunny, Mani Kolbe mailto:sun...@dnb.com>> wrote:

 Hi Max,

 Thank you for your reply. Our use case is to run a batch job against
 a Kinesis source. Our downstream systems are still on batch mode. So
 this application will read from the Kinesis source periodically and
 generate batched outputs. Without ability to resume from a
 checkpoint, it will be reading entire stream every time.

 Regards,
 Mani

 -Original Message-
 From: Maximilian Michels mailto:m...@apache.org>>
 Sent: Tuesday, July 21, 2020 11:38 AM
 To:user@beam.apache.org 
 Cc: Sunny, Mani Kolbe mailto:sun...@dnb.com>>
 Subject: Re: Unbounded sources unable to recover from checkpointMark
 when withMaxReadTime() is used

 CAUTION: This email originated from outside of D Please do not
 click links or open attachments unless you recognize the sender and
 know the content is safe.


 Hi Mani,

 BoundedReadFromUnboundedSource was originally intended to be used in
 batch pipelines. In batch, runners typically do not perform
 checkpointing. In case of failures, they re-run the entire pipeline.

 Keep in mind that, even with checkpointing, reading for a finite
 time in the processing time domain from an unbounded source rarely
 gives consistent results across runs.

 However, ignoring the checkpoint looks problematic. We may want to
 fail during checkpointing to prevent violating correctness (e.g.
 exactly-once semantics).

 -Max

 On 21.07.20 11:36, Sunny, Mani Kolbe wrote:

 Observed on v2.22.0

 When withMaxReadTime() is used, Beam creates a
 BoundedReadFromUnboundedSource [1].  The ReadFn class in
 BoundedReadFromUnboundedSource which is responsible for reading
 records from source. You can see this class doesnt verify if
 there is
 a recoverable 

Re: Partition unbounded collection like Kafka source

2020-07-28 Thread wang Wu
Yes my issue is the lag increasing. We are using Spark Runner. Source is
Kafka and Sink is Cassandra. We tune the batch interval and max records per
batch but the batch interval still less than the processing time of each
batch. So it causes the latency. We tried to apply Reshuffle withRandomKey
on the collection after we read from Kafka but it does not help.

On Tue, Jul 28, 2020, 00:27 Chamikara Jayalath  wrote:

> Probably you should apply the Partition[1] transform on the output
> PCollection of your read. Note though that the exact parallelization is
> runner dependent (for example, runner might autoscale up resulting in more
> writers).
> Did you run into issues when just reading from Kafka and writing to Cassadra
> (without manually controlling the parallelization) ?
>
> Thanks,
> Cham
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
>
> On Thu, Jul 23, 2020 at 5:01 AM wang Wu  wrote:
>
>> Hi,
>> Supposed that a Kafka topic has 3 partitions only. Now we want to
>> partition it into 20 partition, each one will produce an output collection.
>> The purpose is to write to the sink in parallel from all 20 output
>> collections.
>>
>> Will this code achieve that purpose?
>>
>>  KafkaIO.Read reader =
>>   KafkaIO.read()
>>   .withConsumerFactoryFn(
>>   new ConsumerFactoryFn(
>>   topic, 10, numElements, OffsetResetStrategy.EARLIEST))
>> // 10 partitions
>>
>>   PCollection input =
>> p.apply(reader.withoutMetadata()).apply(KafkaToCassandraRow).apply(CassadraIO.write);
>>
>> Regards
>> Dinh
>>
>>


Re: Exceptions: Attempt to deliver a timer to a DoFn, but timers are not supported in Dataflow.

2020-07-28 Thread Mohil Khare
Hello all,

I think I found the reason for the issue.  Since the exception was thrown
by StreamingSideInputDoFnRunner.java, I realize that I recently added side
input to one of my ParDo that does stateful transformations.
It looks like there is some issue when you add side input (My side input
was coming via Global window to ParDo in a Fixed Window) to stateful DoFn.

As a work around, instead of adding side input to stateful ParDo, I
introduced another ParDo  that enriches streaming data with side input
before flowing into stateful DoFn. That seems to have fixed the problem.


Thanks and regards
Mohil



On Mon, Jul 27, 2020 at 10:50 AM Mohil Khare  wrote:

> Hello All,
>
> Any idea how to debug this and find out which stage, which DoFn or which
> side input is causing the problem?
> Do I need to override OnTimer with every DoFn to avoid this problem?
> I thought that some uncaught exceptions were causing this and added
> various checks and exception handling in all DoFn and still seeing this
> issue.
> It has been driving me nuts. And now forget DRAIN, it happens during
> normal functioning as well. Any help would be appreciated.
>
> java.lang.UnsupportedOperationException: Attempt to deliver a timer to a
> DoFn, but timers are not supported in Dataflow.
>
>1.
>   1. at org.apache.beam.runners.dataflow.worker.
>   StreamingSideInputDoFnRunner.onTimer (
>   StreamingSideInputDoFnRunner.java:86
>   
> 
>   )
>   2. at org.apache.beam.runners.dataflow.worker.
>   SimpleParDoFn.processUserTimer (SimpleParDoFn.java:360
>   
> 
>   )
>   3. at org.apache.beam.runners.dataflow.worker.
>   SimpleParDoFn.access$600 (SimpleParDoFn.java:73
>   
> 
>   )
>   4. at org.apache.beam.runners.dataflow.worker.
>   SimpleParDoFn$TimerType$1.processTimer (SimpleParDoFn.java:444
>   
> 
>   )
>   5. at org.apache.beam.runners.dataflow.worker.
>   SimpleParDoFn.processTimers (SimpleParDoFn.java:473
>   
> 
>   )
>   6. at org.apache.beam.runners.dataflow.worker.
>   SimpleParDoFn.processTimers (SimpleParDoFn.java:353
>   
> 
>   )
>   7. at org.apache.beam.runners.dataflow.worker.util.common.worker.
>   ParDoOperation.finish (ParDoOperation.java:52
>   
> 
>   )
>   8. at org.apache.beam.runners.dataflow.worker.util.common.worker.
>   MapTaskExecutor.execute (MapTaskExecutor.java:85
>   
> 
>   )
>   9. at org.apache.beam.runners.dataflow.worker.
>   StreamingDataflowWorker.process (StreamingDataflowWorker.java:1350
>   
> 
>   )
>   10. at org.apache.beam.runners.dataflow.worker.
>   StreamingDataflowWorker.access$1100 (
>   StreamingDataflowWorker.java:152
>   
> 
>   )
>2.
>   1. at org.apache.beam.runners.dataflow.worker.
>   StreamingDataflowWorker$7.run (StreamingDataflowWorker.java:1073)
>   2. at java.util.concurrent.ThreadPoolExecutor.runWorker (
>