PRIYESH RANJAN created CARBONDATA-4276:
------------------------------------------
Summary: writestream fail when csv is copied to readstream hdfs
path
Key: CARBONDATA-4276
URL: https://issues.apache.org/jira/browse/CARBONDATA-4276
Project: CarbonData
Issue Type: Bug
Components: data-load
Affects Versions: 2.2.0
Environment: Spark 2.4.5
Reporter: PRIYESH RANJAN
*steps :*
*+In hdfs execute following command :+*
cd /opt/HA/C10/install/hadoop/datanode/bin/
./hdfs dfs -rm -r /tmp/stream_test/checkpoint_all_data
./hdfs dfs -mkdir -p
/tmp/stream_test/\{checkpoint_all_data,bad_records_all_data}
./hdfs dfs -mkdir -p /Priyesh/streaming/csv/
./hdfs dfs -cp /chetan/100_olap_C20.csv /Priyesh/streaming/csv/
./hdfs dfs -cp /Priyesh/streaming/csv/100_olap_C20.csv
/Priyesh/streaming/csv/100_olap_C21.csv
*+From Spark-beeline /Spark-sql /Spark-shell, execute :+*
DROP TABLE IF EXISTS all_datatypes_2048;
create table all_datatypes_2048 (imei string,deviceInformationId int,MAC
string,deviceColor string,device_backColor string,modelId string,marketName
string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series
string,productionDate timestamp,bomCode string,internalModels string,
deliveryTime string, channelsId string, channelsName string , deliveryAreaId
string, deliveryCountry string, deliveryProvince string, deliveryCity
string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string,
ActiveCheckTime string, ActiveAreaId string, ActiveCountry string,
ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet
string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion
string, Active_operaSysVersion string, Active_BacVerNumber string,
Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer
string,Active_webTypeDataVerNumber string, Active_operatorsVersion string,
Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int,
Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string,
Latest_country string, Latest_province string, Latest_city string,
Latest_district string, Latest_street string, Latest_releaseId string,
Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber
string, Latest_BacFlashVer string, Latest_webUIVersion string,
Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string,
Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string,
Latest_operatorId string, gamePointDescription string,gamePointId
double,contractNumber BigInt) stored as carbondata
TBLPROPERTIES('table_blocksize'='2048','streaming'='true',
'sort_columns'='imei');
*+From Spark-shell ,execute :+*
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.Trigger.ProcessingTime
val df_j=spark.readStream.text("hdfs://hacluster/Priyesh/streaming/csv/*.csv")
df_j.writeStream.format("carbondata").option("dbName","ranjan").option("carbon.stream.parser","org.apache.carbondata.streaming.parser.CSVStreamParserImp").option("checkpointLocation",
"hdfs://hacluster/tmp/stream_test/checkpoint_all_data").option("bad_records_action","hdfs://hacluster/tmp/stream_test/bad_records_all_data").option("tableName","all_datatypes_2048").trigger(ProcessingTime(6000)).option("carbon.streaming.auto.handoff.enabled","true").option("carbon.streaming.segment.max.size",102400).start
show segments for table all_datatypes_2048;
*issue 1 :*
*+when copy csv file in hdfs folder for 1st time after streaming started
,writestream fails with error:+*
scala>
df_j.writeStream.format("carbondata").option("dbName","ranjan").option("carbon.stream.parser","org.apache.carbondata.streaming.parser.CSVStreamParserImp").option("checkpointLocation",
"hdfs://hacluster/tmp/stream_test/checkpoint_all_data").option("bad_records_action","hdfs://hacluster/tmp/stream_test/bad_records_all_data").option("tableName","all_datatypes_2048").trigger(ProcessingTime(6000)).option("carbon.streaming.auto.handoff.enabled","true").option("carbon.streaming.segment.max.size",102400).start
21/08/26 12:53:11 WARN CarbonProperties: The enable mv value "null" is invalid.
Using the default value "true"
21/08/26 12:53:11 WARN CarbonProperties: The value "LOCALLOCK" configured for
key carbon.lock.type is invalid for current file system. Use the default value
HDFSLOCK instead.
21/08/26 12:53:12 WARN HiveConf: HiveConf of name
hive.metastore.rdb.password.decode.enable does not exist
21/08/26 12:53:12 WARN HiveConf: HiveConf of name hive.metastore.db.ssl.enabled
does not exist
21/08/26 12:53:13 WARN HiveConf: HiveConf of name
hive.metastore.rdb.password.decode.enable does not exist
21/08/26 12:53:13 WARN HiveConf: HiveConf of name hive.metastore.db.ssl.enabled
does not exist
21/08/26 12:53:14 WARN ObjectStore: Failed to get database global_temp,
returning NoSuchObjectException
res0: org.apache.spark.sql.streaming.StreamingQuery =
org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@ad038f8
scala> 21/08/26 13:00:49 WARN DFSClient: DataStreamer Exception
java.io.IOException: Failed to replace a bad datanode on the existing pipeline
due to no more good datanodes being available to try. (Nodes:
current=[DatanodeInfoWithStorage[7.187.185.158:50076,DS-67d21505-b7fd-4ccc-b7e5-7c83c042f3e3,DISK],
DatanodeInfoWithStorage[7.187.185.59:50076,DS-443c5cbf-19e9-477e-afdb-92e8807730e3,DISK]],
original=[DatanodeInfoWithStorage[7.187.185.59:50076,DS-443c5cbf-19e9-477e-afdb-92e8807730e3,DISK],
DatanodeInfoWithStorage[7.187.185.158:50076,DS-67d21505-b7fd-4ccc-b7e5-7c83c042f3e3,DISK]]).
The current failed datanode replacement policy is DEFAULT, and a client may
configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy'
in its configuration.
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.findNewDatanode(DFSOutputStream.java:925)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:988)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1156)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:454)
21/08/26 13:00:49 ERROR CarbonUtil: Error while closing
stream:java.io.IOException: Failed to replace a bad datanode on the existing
pipeline due to no more good datanodes being available to try. (Nodes:
current=[DatanodeInfoWithStorage[7.187.185.158:50076,DS-67d21505-b7fd-4ccc-b7e5-7c83c042f3e3,DISK],
DatanodeInfoWithStorage[7.187.185.59:50076,DS-443c5cbf-19e9-477e-afdb-92e8807730e3,DISK]],
original=[DatanodeInfoWithStorage[7.187.185.59:50076,DS-443c5cbf-19e9-477e-afdb-92e8807730e3,DISK],
DatanodeInfoWithStorage[7.187.185.158:50076,DS-67d21505-b7fd-4ccc-b7e5-7c83c042f3e3,DISK]]).
The current failed datanode replacement policy is DEFAULT, and a client may
configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy'
in its configuration.
*issue 2 :*
*+when copy csv file in hdfs folder for 2nd time after streaming started
,writestreaming fails with :+*
21/08/26 13:01:36 ERROR StreamSegment: Failed to append batch data to stream
segment:
hdfs://hacluster/user/sparkhive/warehouse/ranjan.db/all_datatypes_2048/Fact/Part0/Segment_0
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
Failed to APPEND_FILE
/user/sparkhive/warehouse/ranjan.db/all_datatypes_2048/Fact/Part0/Segment_0/part-0-0_batchno0-0-0-0.snappy.carbondata
for DFSClient_NONMAPREDUCE_260546362_1 on 7.187.185.158 because
DFSClient_NONMAPREDUCE_260546362_1 is already the current lease holder.
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2540)
at
org.apache.hadoop.hdfs.server.namenode.FSDirAppendOp.appendFile(FSDirAppendOp.java:124)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2624)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:805)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:487)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:872)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:818)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2678)
at org.apache.hadoop.ipc.Client.call(Client.java:1475)
at org.apache.hadoop.ipc.Client.call(Client.java:1412)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy17.append(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:328)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy18.append(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1808)
at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1877)
at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1847)
at
org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:340)
at
org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:336)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:348)
at
org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:318)
at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1166)
at
org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile.getDataOutputStreamUsingAppend(AbstractDFSCarbonFile.java:440)
at
org.apache.carbondata.core.datastore.impl.FileFactory.getDataOutputStreamUsingAppend(FileFactory.java:348)
at
org.apache.carbondata.streaming.CarbonStreamRecordWriter.initializeAtFirstRow(CarbonStreamRecordWriter.java:176)
at
org.apache.carbondata.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:210)
at
org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:278)
at
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:349)
at
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:338)
at
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:338)
at
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:351)
at
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$liftedTree1$1$1.apply(CarbonAppendableStreamSink.scala:271)
at
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$liftedTree1$1$1.apply(CarbonAppendableStreamSink.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
21/08/26 13:01:36 ERROR Utils: Aborting task
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
Failed to APPEND_FILE
/user/sparkhive/warehouse/ranjan.db/all_datatypes_2048/Fact/Part0/Segment_0/part-0-0_batchno0-0-0-0.snappy.carbondata
for DFSClient_NONMAPREDUCE_260546362_1 on 7.187.185.158 because
DFSClient_NONMAPREDUCE_260546362_1 is already the current lease holder.
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2540)
at
org.apache.hadoop.hdfs.server.namenode.FSDirAppendOp.appendFile(FSDirAppendOp.java:124)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2624)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:805)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:487)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:872)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:818)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2678)
at org.apache.hadoop.ipc.Client.call(Client.java:1475)
at org.apache.hadoop.ipc.Client.call(Client.java:1412)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy17.append(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:328)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy18.append(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1808)
at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1877)
at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1847)
at
org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:340)
at
org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:336)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:348)
at
org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:318)
at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1166)
at
org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile.getDataOutputStreamUsingAppend(AbstractDFSCarbonFile.java:440)
at
org.apache.carbondata.core.datastore.impl.FileFactory.getDataOutputStreamUsingAppend(FileFactory.java:348)
at
org.apache.carbondata.streaming.CarbonStreamRecordWriter.initializeAtFirstRow(CarbonStreamRecordWriter.java:176)
at
org.apache.carbondata.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:210)
at
org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:278)
at
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:349)
at
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:338)
at
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:338)
at
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:351)
at
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$liftedTree1$1$1.apply(CarbonAppendableStreamSink.scala:271)
at
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$liftedTree1$1$1.apply(CarbonAppendableStreamSink.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)