[ 
https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15406293#comment-15406293
 ] 

ASF GitHub Bot commented on FLINK-4311:
---------------------------------------

Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/2330
  
    I don't know, and it seems the InputFormat itself doesn't know either. If 
we go by the previous implementation then yes, there will only be one table. 
However, based on the comments on Line 64: `// abstract methods allow for 
multiple table and scanners in the same job` we have to conclude that there can 
be different tables.
    
    I'd be curious what @twalthr thinks about this.


> TableInputFormat fails when reused on next split
> ------------------------------------------------
>
>                 Key: FLINK-4311
>                 URL: https://issues.apache.org/jira/browse/FLINK-4311
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.0.3
>            Reporter: Niels Basjes
>            Assignee: Niels Basjes
>            Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the 
> TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: 
> Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>       at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
>       at 
> org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
>       at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
>       at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
>       at 
> org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:155)
>       at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
>       at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
>       at 
> org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
>       at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
>  rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
>       at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
>       at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
>       at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
>       at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
>       at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
>       at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
>       at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
>       at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
>       ... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail 
> because the table has already been closed.
> We also found that this error varies with the versions of HBase that are 
> used. I have also seen this exception:
> {quote}
> Caused by: java.io.IOException: hconnection-0x19d37183 closed
>       at 
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
>       at 
> org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300)
>       ... 37 more
> {quote}
> I found that in the [documentation of the InputFormat 
> interface|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/io/InputFormat.html]
>  is clearly states
> {quote}IMPORTANT NOTE: Input formats must be written such that an instance 
> can be opened again after it was closed. That is due to the fact that the 
> input format is used for potentially multiple splits. After a split is done, 
> the format's close function is invoked and, if another split is available, 
> the open function is invoked afterwards for the next split.{quote}
> It appears that this specific InputFormat has not been checked against this 
> constraint.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to