[ 
https://issues.apache.org/jira/browse/KUDU-3347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17479766#comment-17479766
 ] 

Alexey Serbin commented on KUDU-3347:
-------------------------------------

Regardless of the read mode using for scanning, the client issues write 
requests only to leader replicas.  If a replica turns to become follower when 
write request arrives, the client receives appropriate error status and retries 
the write operations under the hood.

The scanner keeps a reference to the server it opened the scanner, so it sends 
requests to that server as needed when iterating over the scan results.

Hope this helps.

> kudu java client throws "scanner expired error" in spark app to delete rows
> ---------------------------------------------------------------------------
>
>                 Key: KUDU-3347
>                 URL: https://issues.apache.org/jira/browse/KUDU-3347
>             Project: Kudu
>          Issue Type: Bug
>            Reporter: Redriver
>            Priority: Major
>             Fix For: n/a
>
>
> I encountered a similar issue. I used Spark to delete rows, but 
> unfortunately, the Spark job always failed because there are a lot of tasks 
> encountered "Scanner xxx not found". On the kudu tserver, I also found the 
> scanner expired.
> The spark error:
> {code:java}
>  
> Error: Error running query: org.apache.spark.SparkException: Job aborted due 
> to stage failure: Task 111 in stage 17.0 failed 4 times, most recent failure: 
> Lost task 111.3 in stage 17.0 (TID 1593, 
> hdc47-mcc10-01-0710-3905-026-tess0035.stratus.rno.ebay.com, executor 91): 
> java.lang.RuntimeException: org.apache.kudu.client.NonRecoverableException: 
> Scanner 7a365cf5dd1246419b127ece97593817 not found (it may have expired)
>     at 
> org.apache.kudu.client.KuduScannerIterator.hasNext(KuduScannerIterator.java:77)
>     at org.apache.kudu.spark.kudu.RowIterator.hasNext(KuduRDD.scala:170)
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
>     at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>  Source)
>     at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:50)
>     at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:730)
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
>     at scala.collection.Iterator.foreach(Iterator.scala:941)
>     at scala.collection.Iterator.foreach$(Iterator.scala:941)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
>     at 
> org.apache.kudu.spark.kudu.KuduContext.writePartitionRows(KuduContext.scala:482)
>     at 
> org.apache.kudu.spark.kudu.KuduContext.$anonfun$writeRows$5(KuduContext.scala:392)
>     at 
> org.apache.kudu.spark.kudu.KuduContext.$anonfun$writeRows$5$adapted(KuduContext.scala:385)
>     at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1035)
>     at 
> org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1035)
>     at 
> org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2275)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>     at org.apache.spark.scheduler.Task.run(Task.scala:129)
>     at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:486)
>     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1391)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:489)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kudu.client.NonRecoverableException: Scanner 
> 7a365cf5dd1246419b127ece97593817 not found (it may have expired)
>     at 
> org.apache.kudu.client.KuduException.transformException(KuduException.java:110)
>     at 
> org.apache.kudu.client.KuduClient.joinAndHandleException(KuduClient.java:443)
>     at org.apache.kudu.client.KuduScanner.nextRows(KuduScanner.java:81)
>     at 
> org.apache.kudu.client.KuduScannerIterator.hasNext(KuduScannerIterator.java:69)
>     ... 23 more
>     Suppressed: org.apache.kudu.client.KuduException$OriginalException: 
> Original asynchronous stack trace
>         at org.apache.kudu.client.RpcProxy.dispatchTSError(RpcProxy.java:358)
>         at org.apache.kudu.client.RpcProxy.responseReceived(RpcProxy.java:277)
>         at org.apache.kudu.client.RpcProxy.access$000(RpcProxy.java:63)
>         at org.apache.kudu.client.RpcProxy$1.call(RpcProxy.java:157)
>         at org.apache.kudu.client.RpcProxy$1.call(RpcProxy.java:153)
>         at org.apache.kudu.client.Connection.channelRead0(Connection.java:357)
>         at 
> org.apache.kudu.shaded.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>         at 
> org.apache.kudu.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>         at 
> org.apache.kudu.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>         at 
> org.apache.kudu.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>         at 
> org.apache.kudu.shaded.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
>         at 
> org.apache.kudu.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
>         at 
> org.apache.kudu.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>         at 
> org.apache.kudu.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>         at 
> org.apache.kudu.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> {code}
>  
> The Kudu-tserver error:
>  
> {code:java}
> I0114 02:16:09.430703 39491 scanners.cc:280] Expiring scanner id: 
> 9694bcb5ae244be2be6e27b9293ff8b7, of tablet 44fa35c99e7042329bbfa0268c1cd4de, 
> after 62106 ms of inactivity, which is > TTL (60000 ms).
> I0114 02:16:09.432278 39491 scanners.cc:280] Expiring scanner id: 
> 7a365cf5dd1246419b127ece97593817, of tablet 44fa35c99e7042329bbfa0268c1cd4de, 
> after 62538 ms of inactivity, which is > TTL (60000 ms).
> I0114 02:16:09.433409 39491 scanners.cc:280] Expiring scanner id: 
> 3de03b4250074a6daaeb7a9470f9d447, of tablet 44fa35c99e7042329bbfa0268c1cd4de, 
> after 63128 ms of inactivity, which is > TTL (60000 ms).
> I0114 02:16:09.434437 39491 scanners.cc:280] Expiring scanner id: 
> a4d7e914d64f4927b87904570ae93df6, of tablet 44fa35c99e7042329bbfa0268c1cd4de, 
> after 61813 ms of inactivity, which is > TTL (60000 ms).
> I0114 02:16:09.435580 39491 scanners.cc:280] Expiring scanner id: 
> 1b09febbaed2448fa69ebc041dc630ae, of tablet 44fa35c99e7042329bbfa0268c1cd4de, 
> after 63365 ms of inactivity, which is > TTL (60000 ms).{code}
> It looks like the following code has potential issue:
>  
> {code:java}
>     // Write the rows for each Spark partition.
>     rdd.foreachPartition(iterator => {
>       val pendingErrors = writePartitionRows(
>         iterator,
>         schema,
>         tableName,
>         adjustedOperation,
>         lastPropagatedTimestamp,
>         adjustedWriteOptions)
>       if (pendingErrors.getRowErrors.nonEmpty) {
>         val errors = pendingErrors.getRowErrors
>         val sample = errors.take(5).map(_.getErrorStatus).mkString
>         if (pendingErrors.isOverflowed) {
>           throw new RuntimeException(
>             s"PendingErrors overflowed. Failed to write at least 
> ${errors.length} rows " +
>               s"to Kudu; Sample errors: $sample")
>         } else {
>           throw new RuntimeException(
>             s"Failed to write ${errors.length} rows to Kudu; Sample errors: 
> $sample")
>         }
>       }
>     }){code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to