OK, there's a try-with-resource to close the TableLoader in FlinkInputFormat [1]. so we don't have to do the extra try-with-resource in PR 2051 ( I will close that).
Under my host, I did not reproduce your connection leak issues when running TestFlinkInputFormatReaderDeletes. Did you have any extra usage about the table loader and forget to close it in your flip-27 dev branch ? [1]. https://github.com/apache/iceberg/blob/7645ceba65044184be192a7194a38729133b2e50/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java#L77 On Fri, Jan 8, 2021 at 3:36 PM OpenInx <[email protected]> wrote: > > I was able to almost 100% reproduce the HiveMetaStoreClient aborted > connection problem locally with Flink tests after adding > another DeleteReadTests for the new FLIP-27 source impl in my dev branch > > I think I found the cause why it's easy to fail. The > TestFlinkInputFormatReaderDeletes will create a new CatalogLoader [1] for > loading table purposes inside the FlinkInputFormat. > > TestHelpers.readRowData(inputFormat, rowType).forEach(rowData -> { > RowDataWrapper wrapper = new RowDataWrapper(rowType, > projected.asStruct()); > set.add(wrapper.wrap(rowData)); > }); > > When TestHelpers#readRowData, it will open a new catalog ( that means > opening a new hive connection). But after we finished the read processing, > we did not close the TableLoader, which leaks the catalog connection. I > opened a PR [2] to fix this issue, will it work in your branch ? > > I think it's worth keeping those hive catalog unit tests so that we could > detect those connection leak issues in time. > > [1]. > https://github.com/apache/iceberg/blob/4436c92928f4b3b90839a26bf6a656902733261f/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormatReaderDeletes.java#L114 > [2]. https://github.com/apache/iceberg/pull/2051/files > > On Fri, Jan 8, 2021 at 5:48 AM Steven Wu <[email protected]> wrote: > >> Ryan/OpenInx, thanks a lot for the pointers. >> >> I was able to almost 100% reproduce the HiveMetaStoreClient aborted >> connection problem locally with Flink tests after adding >> another DeleteReadTests for the new FLIP-27 source impl in my dev branch. I >> don't see the problem anymore after switching the Flink DeleteReadTests >> from the HiveCatalog (requiring expensive TestHiveMetastore) to >> HadoopCatalog. >> >> There is still a base test class FlinkTestBase using the HiveCatalog. I >> am wondering if there is a value for using the more expensive HiveCatalog >> than the HadoopCatalog? >> >> On Wed, Jan 6, 2021 at 6:22 PM OpenInx <[email protected]> wrote: >> >>> I encountered a similar issue when supporting hive-site.xml for flink >>> hive catalog. Here is the discussion and solution before: >>> https://github.com/apache/iceberg/pull/1586#discussion_r509453461 >>> >>> It's a connection leak issue. >>> >>> >>> On Thu, Jan 7, 2021 at 10:06 AM Ryan Blue <[email protected]> >>> wrote: >>> >>>> I've noticed this too. I haven't had a chance to track down what's >>>> causing it yet. I've seen it in Spark tests, so it looks like there may be >>>> a problem that affects both. Probably a connection leak in the common code. >>>> >>>> On Wed, Jan 6, 2021 at 3:44 PM Steven Wu <[email protected]> wrote: >>>> >>>>> I have noticed some flakiness with Flink and Spark tests both locally >>>>> and in CI checks. @zhangjun0x01 also reported the same problem with >>>>> iceberg-spark3-extensions. Below is a full stack trace from a local >>>>> run for Flink tests. >>>>> >>>>> The flakiness might be recent regression, as the tests were stable for >>>>> me until recently. Any recent hive dep change? Anyone have any ideas? >>>>> >>>>> org.apache.iceberg.flink.source.TestIcebergSourceReaderDeletes > >>>>> testMixedPositionAndEqualityDeletes[fileFormat=ORC] FAILED >>>>> >>>>> java.lang.RuntimeException: Failed to get table info from >>>>> metastore default.test >>>>> >>>>> at >>>>> org.apache.iceberg.hive.HiveTableOperations.doRefresh(HiveTableOperations.java:142) >>>>> >>>>> at >>>>> org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:86) >>>>> >>>>> at >>>>> org.apache.iceberg.BaseMetastoreTableOperations.current(BaseMetastoreTableOperations.java:69) >>>>> >>>>> at >>>>> org.apache.iceberg.BaseMetastoreCatalog.loadTable(BaseMetastoreCatalog.java:92) >>>>> >>>>> at >>>>> org.apache.iceberg.flink.TableLoader$CatalogTableLoader.loadTable(TableLoader.java:113) >>>>> >>>>> at >>>>> org.apache.iceberg.flink.source.TestIcebergSourceReaderDeletes.rowSet(TestIcebergSourceReaderDeletes.java:90) >>>>> >>>>> >>>>> Caused by: >>>>> >>>>> org.apache.thrift.transport.TTransportException: >>>>> java.net.SocketException: Broken pipe (Write failed) >>>>> >>>>> at >>>>> org.apache.thrift.transport.TIOStreamTransport.flush(TIOStreamTransport.java:161) >>>>> >>>>> at >>>>> org.apache.thrift.TServiceClient.sendBase(TServiceClient.java:73) >>>>> >>>>> at >>>>> org.apache.thrift.TServiceClient.sendBase(TServiceClient.java:62) >>>>> >>>>> at >>>>> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.send_get_table_req(ThriftHiveMetastore.java:1561) >>>>> >>>>> at >>>>> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_table_req(ThriftHiveMetastore.java:1553) >>>>> >>>>> at >>>>> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:1350) >>>>> >>>>> at >>>>> org.apache.iceberg.hive.HiveTableOperations.lambda$doRefresh$0(HiveTableOperations.java:130) >>>>> >>>>> at >>>>> org.apache.iceberg.hive.ClientPool.run(ClientPool.java:65) >>>>> >>>>> at >>>>> org.apache.iceberg.hive.HiveTableOperations.doRefresh(HiveTableOperations.java:130) >>>>> >>>>> ... 5 more >>>>> >>>>> >>>>> Caused by: >>>>> >>>>> java.net.SocketException: Broken pipe (Write failed) >>>>> >>>>> at java.net.SocketOutputStream.socketWrite0(Native >>>>> Method) >>>>> >>>>> at >>>>> java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111) >>>>> >>>>> at >>>>> java.net.SocketOutputStream.write(SocketOutputStream.java:155) >>>>> >>>>> at >>>>> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) >>>>> >>>>> at >>>>> java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) >>>>> >>>>> at >>>>> org.apache.thrift.transport.TIOStreamTransport.flush(TIOStreamTransport.java:159) >>>>> >>>>> ... 13 more >>>>> >>>> >>>> >>>> -- >>>> Ryan Blue >>>> Software Engineer >>>> Netflix >>>> >>>
