[ https://issues.apache.org/jira/browse/FLINK-18056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yun Gao updated FLINK-18056: ---------------------------- Component/s: Connectors / Hive > Hive file sink throws exception when the target in-progress file exists. > ------------------------------------------------------------------------ > > Key: FLINK-18056 > URL: https://issues.apache.org/jira/browse/FLINK-18056 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem, Connectors / Hive > Reporter: Yun Gao > Priority: Blocker > Fix For: 1.11.0 > > > Currently after failover or restart, the Hive file sink will try to overwrite > the data since the last checkpoint, however, currently neither the > in-progress file is deleted nor hive uses the overwritten mode, thus an > exception occurs after restarting: > {code:java} > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): > failed to create file > /user/hive/warehouse/datalake/dt=2020-06-01/hr=22/.part-0-10.inprogress for > DFSClient_NONMAPREDUCE_-1017064593_62 for client 100.96.206.42 because > current leaseholder is trying to recreate file. > {code} > The full stack of the exception is > {code:java} > org.apache.flink.connectors.hive.FlinkHiveException: > org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create > Hive RecordWriter > at > org.apache.flink.connectors.hive.write.HiveWriterFactory.createRecordWriter(HiveWriterFactory.java:159) > at > org.apache.flink.connectors.hive.write.HiveBulkWriterFactory.create(HiveBulkWriterFactory.java:47) > at > org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:234) > at > org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:207) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:284) > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104) > at > org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > at StreamExecCalc$16.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > at > org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:123) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > at StreamExecCalc$2.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > at > org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.run(DataGeneratorSource.java:82) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) > Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Failed > to create Hive RecordWriter > at > org.apache.flink.table.catalog.hive.client.HiveShimV110.getHiveRecordWriter(HiveShimV110.java:58) > at > org.apache.flink.connectors.hive.write.HiveWriterFactory.createRecordWriter(HiveWriterFactory.java:151) > ... 36 more > Caused by: java.lang.reflect.InvocationTargetException > at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.table.catalog.hive.client.HiveShimV110.getHiveRecordWriter(HiveShimV110.java:55) > ... 37 more > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): > failed to create file > /user/hive/warehouse/datalake/dt=2020-06-01/hr=22/.part-0-10.inprogress for > DFSClient_NONMAPREDUCE_-1017064593_62 for client 100.96.206.42 because > current leaseholder is trying to recreate file. > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:3075) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2783) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2676) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2561) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:593) > at > org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.create(AuthorizationProviderProxyClientProtocol.java:111) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:393) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2086) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2082) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2080) > at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1489) > at org.apache.hadoop.ipc.Client.call(Client.java:1435) > at org.apache.hadoop.ipc.Client.call(Client.java:1345) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116) > at com.sun.proxy.$Proxy35.create(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:297) > at sun.reflect.GeneratedMethodAccessor36.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346) > at com.sun.proxy.$Proxy36.create(Unknown Source) > at > org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:265) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1274) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1216) > at > org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:473) > at > org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:470) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:470) > at > org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:411) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:807) > at parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:176) > at parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:160) > at > parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:289) > at > parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:267) > at > org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.<init>(ParquetRecordWriterWrapper.java:66) > at > org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getParquerRecordWriterWrapper(MapredParquetOutputFormat.java:125) > at > org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getHiveRecordWriter(MapredParquetOutputFormat.java:114) > at > org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:261) > ... 41 more > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)