Hello fellow spark devs, hope you are doing fabulous,

Dropping a brain dump here about the Spark kinesis integration. I am able
to get spark kinesis to work perfectly under ideal conditions, but see a
lot of open ends when things are not so ideal. I feel there are lot of open
ends and are specific to Kinesis (and need attention).

Unlike Kafka, which works flawlessly with Spark, Kinesis has its own sets
of issues involving Throttling and recovery. I would like to volunteer to
fix few such issues faced by us. Proposing some critical improvements which
would make Kinesis more reliable with Spark:

1. Kinesis Retries : Right now kinesis retries are hard coded with a very
small value of 100 ms. While this is ok for normal errors while fetching,
its not ok for Throttling errors. The Kinesis throttles are lot more
frequent in a recovery situation (will talk more on it in point 3).The
Kinesis throttles are per second, hence a retry in 100 ms will not take it
any far. We should have these values configurable to have more control over
the retries. https://github.com/apache/spark/pull/17467

2. Kinesis recovery from provided timestamp : Kinesis client lib has the
capability of resuming from a TIMESTAMP and this is very useful when we
would want to start from a specified time in past rather than reading from
INITIAL position. We have few scenarios where we need to read a huge stream
and filter out all data before a timestamp. This change will push this to
kinesis client and get only relevant data. I have got a half-baked patch
for this. https://issues.apache.org/jira/browse/SPARK-20168

3. Kinesis recovery : This issue is a bit tricky and I would need help of
experienced contributors for a proper design.
Kinesis in ideal scenarios uses the kinesis consumer library, and the
library takes care of the requests made to shards. But in case of recovery
the Kinesis consumer makes direct requests to Kinesis from all the tasks.
Thousands of tasks request to the all the shards (multiple requests per
shards). Unlike Kafka where the requests are just network bound, Kinesis
requests are Read throttled if lot of requests are made in a second. Not
sure what would be the best way to make these requests organized but we
should have some way of limiting number of request per shard. We use
workaround of increasing the fail tasks tolerance to a ridiculously high
value and the job completes after a thousand failures. Point 1 mentioned
above ^ can also be used in this workaround to delay the retries so that
the request volume decreases per shard. I don't have a ticket for it yet
but will get one soon.

4. Spark doesn't work with AWS temporary tokens : Currently spark just
reads the aws tokens using various strategies. Temporary session tokens are
more secure and (probably) encouraged way of using the AWS credentials.
Temporary tokens are also useful for accessing cross account aws resources.
Currently not lot of usecases are using temporary tokens, but soon this
will be a common scenario. We are already facing some minor challenges and
resorting to workarounds until its supported in Spark. We should have a
mechanism to make Spark work with the temporary aws tokens. Thoughts ?

I am trying to get familiar to Spark's code base along the process. I will
be primarily focussing on the spark-kinesis bit for now. I would like to
have your valuable inputs on these.

Cheers,
Yash

Reply via email to