Hello,

we are using cloudera: kudu 1.20 with 5 tablet server and 3 master server
spark 2.0 running on 5 worker nodes
spark-jobs started with yarn-client and python 2.7.10 via spark2-submit

The code i´m running is essentially:

df = sqlContext.read.format('org.apache.kudu.spark.kudu')\
                    .option('kudu.master', self.kudu_master)\
                    .option('kudu.table', self.source_table)\
                    .load()
rdd = df.rdd.map(lambda x: compute_intensive_function(x))
print rdd.count()

After 2 to 10 minutes spark reproducible throws something like:

17/03/17 09:18:18 INFO scheduler.TaskSetManager: Starting task 2.1 in
stage 1.0 (TID 5, xx.xx.de, executor 19, partition 3, NODE_LOCAL, 6133
bytes)
17/03/17 09:18:18 INFO
cluster.YarnSchedulerBackend$YarnDriverEndpoint: Launching task 5 on
executor id: 19 hostname: xx.xx.de.

17/03/17 09:20:40 WARN scheduler.TaskSetManager: Lost task 2.1 in
stage 1.0 (TID 5, xx.xx.de, executor 19):
org.apache.kudu.client.NonRecoverableException: Scanner not found

        at 
org.apache.kudu.client.TabletClient.dispatchTSErrorOrReturnException(TabletClient.java:557)

On the kudu tablet server in the logs something appears like:

I0317 10:10:33.199148 28240 scanners.cc:165] Expiring scanner id:
eb38ff5eb718426e85bc5bc22de7bdac, of tablet
4d316a9dd5424ceb91c91aff90290164, after 60314403 us of inactivity,
which is > TTL (60000000 us).
I0317 10:13:03.201413 28240 scanners.cc:165] Expiring scanner id:
00d5bec1906241ee8749e354665497e9, of tablet
4d316a9dd5424ceb91c91aff90290164, after 62854357 us of inactivity,
which is > TTL (60000000 us).
I0317 10:15:33.203809 28240 scanners.cc:165] Expiring scanner id:
cabb68f96756463989e0abdbad1bbaaf, of tablet
4d316a9dd5424ceb91c91aff90290164, after 61797695 us of inactivity,
which is > TTL (60000000 us).

Using parquet rather than kudu as data source

df = sqlContext.read.parquet('/tmp/test/foo')
rdd = df.rdd.map(lambda x: compute_intensive_function(x))
print rdd.count()

everything works out fine, even if one single computation task takes
15 minutes or more.

In the kudu flags i found this parameter:

--scanner_ttl_ms=60000

Altering it to some bigger value is helping. As our computation can
take up to 90 minutes on one executor i would have to set
scanner_ttl_ms to such a big value, that i fear some sideeffects i
can't estimate.

So my question to you is if there are other parameters i should try to
alter, like

--scanner_batch_size_rows=100
--scanner_default_batch_size_bytes=1048576
--scanner_max_batch_size_bytes=8388608
--scanner_max_wait_ms=1000

or parameters within spark which could affect the behaviour in a desirable way.

The number of tasks spark is creating depends on the number of
partitions in kudu. Probably it would be helpful to rise the number of
tasks, as one single task would need less computation time, which
would lead to fewer errors like reported above. I don´t know a
performant and simple way to rise the number of tasks at will and i´m
not sure if this approach would be desireable from a spark point of
view. What would be an ideal number of kudu partitions when we have
5*20 Cores in the spark computation nodes?


Thank you for your help
Frank

Reply via email to