PRIYESH RANJAN created CARBONDATA-4276:
------------------------------------------

             Summary: writestream fail when csv is copied to readstream hdfs 
path
                 Key: CARBONDATA-4276
                 URL: https://issues.apache.org/jira/browse/CARBONDATA-4276
             Project: CarbonData
          Issue Type: Bug
          Components: data-load
    Affects Versions: 2.2.0
         Environment: Spark 2.4.5
            Reporter: PRIYESH RANJAN


*steps :*

*+In hdfs execute following command :+*

 cd /opt/HA/C10/install/hadoop/datanode/bin/
./hdfs dfs -rm -r /tmp/stream_test/checkpoint_all_data
./hdfs dfs -mkdir -p 
/tmp/stream_test/\{checkpoint_all_data,bad_records_all_data}
./hdfs dfs -mkdir -p /Priyesh/streaming/csv/
./hdfs dfs -cp /chetan/100_olap_C20.csv /Priyesh/streaming/csv/

./hdfs dfs -cp /Priyesh/streaming/csv/100_olap_C20.csv 
/Priyesh/streaming/csv/100_olap_C21.csv

 

*+From Spark-beeline /Spark-sql /Spark-shell, execute :+*

DROP TABLE IF EXISTS all_datatypes_2048;
create table all_datatypes_2048 (imei string,deviceInformationId int,MAC 
string,deviceColor string,device_backColor string,modelId string,marketName 
string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series 
string,productionDate timestamp,bomCode string,internalModels string, 
deliveryTime string, channelsId string, channelsName string , deliveryAreaId 
string, deliveryCountry string, deliveryProvince string, deliveryCity 
string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, 
ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, 
ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet 
string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion 
string, Active_operaSysVersion string, Active_BacVerNumber string, 
Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer 
string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, 
Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, 
Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, 
Latest_country string, Latest_province string, Latest_city string, 
Latest_district string, Latest_street string, Latest_releaseId string, 
Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber 
string, Latest_BacFlashVer string, Latest_webUIVersion string, 
Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, 
Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, 
Latest_operatorId string, gamePointDescription string,gamePointId 
double,contractNumber BigInt) stored as carbondata 
TBLPROPERTIES('table_blocksize'='2048','streaming'='true', 
'sort_columns'='imei');

 

*+From Spark-shell ,execute :+*

import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.Trigger.ProcessingTime

val df_j=spark.readStream.text("hdfs://hacluster/Priyesh/streaming/csv/*.csv")

df_j.writeStream.format("carbondata").option("dbName","ranjan").option("carbon.stream.parser","org.apache.carbondata.streaming.parser.CSVStreamParserImp").option("checkpointLocation",
 
"hdfs://hacluster/tmp/stream_test/checkpoint_all_data").option("bad_records_action","hdfs://hacluster/tmp/stream_test/bad_records_all_data").option("tableName","all_datatypes_2048").trigger(ProcessingTime(6000)).option("carbon.streaming.auto.handoff.enabled","true").option("carbon.streaming.segment.max.size",102400).start

show segments for table all_datatypes_2048;

 

*issue 1 :*

*+when  copy csv file in hdfs folder for 1st time after streaming started 
,writestream fails with error:+*

scala> 
df_j.writeStream.format("carbondata").option("dbName","ranjan").option("carbon.stream.parser","org.apache.carbondata.streaming.parser.CSVStreamParserImp").option("checkpointLocation",
 
"hdfs://hacluster/tmp/stream_test/checkpoint_all_data").option("bad_records_action","hdfs://hacluster/tmp/stream_test/bad_records_all_data").option("tableName","all_datatypes_2048").trigger(ProcessingTime(6000)).option("carbon.streaming.auto.handoff.enabled","true").option("carbon.streaming.segment.max.size",102400).start
21/08/26 12:53:11 WARN CarbonProperties: The enable mv value "null" is invalid. 
Using the default value "true"
21/08/26 12:53:11 WARN CarbonProperties: The value "LOCALLOCK" configured for 
key carbon.lock.type is invalid for current file system. Use the default value 
HDFSLOCK instead.
21/08/26 12:53:12 WARN HiveConf: HiveConf of name 
hive.metastore.rdb.password.decode.enable does not exist
21/08/26 12:53:12 WARN HiveConf: HiveConf of name hive.metastore.db.ssl.enabled 
does not exist
21/08/26 12:53:13 WARN HiveConf: HiveConf of name 
hive.metastore.rdb.password.decode.enable does not exist
21/08/26 12:53:13 WARN HiveConf: HiveConf of name hive.metastore.db.ssl.enabled 
does not exist
21/08/26 12:53:14 WARN ObjectStore: Failed to get database global_temp, 
returning NoSuchObjectException
res0: org.apache.spark.sql.streaming.StreamingQuery = 
org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@ad038f8

scala> 21/08/26 13:00:49 WARN DFSClient: DataStreamer Exception
java.io.IOException: Failed to replace a bad datanode on the existing pipeline 
due to no more good datanodes being available to try. (Nodes: 
current=[DatanodeInfoWithStorage[7.187.185.158:50076,DS-67d21505-b7fd-4ccc-b7e5-7c83c042f3e3,DISK],
 
DatanodeInfoWithStorage[7.187.185.59:50076,DS-443c5cbf-19e9-477e-afdb-92e8807730e3,DISK]],
 
original=[DatanodeInfoWithStorage[7.187.185.59:50076,DS-443c5cbf-19e9-477e-afdb-92e8807730e3,DISK],
 
DatanodeInfoWithStorage[7.187.185.158:50076,DS-67d21505-b7fd-4ccc-b7e5-7c83c042f3e3,DISK]]).
 The current failed datanode replacement policy is DEFAULT, and a client may 
configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy' 
in its configuration.
 at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.findNewDatanode(DFSOutputStream.java:925)
 at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:988)
 at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1156)
 at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:454)
21/08/26 13:00:49 ERROR CarbonUtil: Error while closing 
stream:java.io.IOException: Failed to replace a bad datanode on the existing 
pipeline due to no more good datanodes being available to try. (Nodes: 
current=[DatanodeInfoWithStorage[7.187.185.158:50076,DS-67d21505-b7fd-4ccc-b7e5-7c83c042f3e3,DISK],
 
DatanodeInfoWithStorage[7.187.185.59:50076,DS-443c5cbf-19e9-477e-afdb-92e8807730e3,DISK]],
 
original=[DatanodeInfoWithStorage[7.187.185.59:50076,DS-443c5cbf-19e9-477e-afdb-92e8807730e3,DISK],
 
DatanodeInfoWithStorage[7.187.185.158:50076,DS-67d21505-b7fd-4ccc-b7e5-7c83c042f3e3,DISK]]).
 The current failed datanode replacement policy is DEFAULT, and a client may 
configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy' 
in its configuration.

*issue 2 :*

*+when  copy csv file in hdfs folder for 2nd time after streaming started 
,writestreaming fails with :+*

 

21/08/26 13:01:36 ERROR StreamSegment: Failed to append batch data to stream 
segment: 
hdfs://hacluster/user/sparkhive/warehouse/ranjan.db/all_datatypes_2048/Fact/Part0/Segment_0
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
 Failed to APPEND_FILE 
/user/sparkhive/warehouse/ranjan.db/all_datatypes_2048/Fact/Part0/Segment_0/part-0-0_batchno0-0-0-0.snappy.carbondata
 for DFSClient_NONMAPREDUCE_260546362_1 on 7.187.185.158 because 
DFSClient_NONMAPREDUCE_260546362_1 is already the current lease holder.
 at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2540)
 at 
org.apache.hadoop.hdfs.server.namenode.FSDirAppendOp.appendFile(FSDirAppendOp.java:124)
 at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2624)
 at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:805)
 at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:487)
 at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
 at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
 at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
 at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:872)
 at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:818)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
 at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2678)

at org.apache.hadoop.ipc.Client.call(Client.java:1475)
 at org.apache.hadoop.ipc.Client.call(Client.java:1412)
 at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
 at com.sun.proxy.$Proxy17.append(Unknown Source)
 at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:328)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:497)
 at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
 at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
 at com.sun.proxy.$Proxy18.append(Unknown Source)
 at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1808)
 at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1877)
 at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1847)
 at 
org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:340)
 at 
org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:336)
 at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 at 
org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:348)
 at 
org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:318)
 at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1166)
 at 
org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile.getDataOutputStreamUsingAppend(AbstractDFSCarbonFile.java:440)
 at 
org.apache.carbondata.core.datastore.impl.FileFactory.getDataOutputStreamUsingAppend(FileFactory.java:348)
 at 
org.apache.carbondata.streaming.CarbonStreamRecordWriter.initializeAtFirstRow(CarbonStreamRecordWriter.java:176)
 at 
org.apache.carbondata.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:210)
 at 
org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:278)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:349)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:338)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:338)
 at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:351)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$liftedTree1$1$1.apply(CarbonAppendableStreamSink.scala:271)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$liftedTree1$1$1.apply(CarbonAppendableStreamSink.scala:270)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
 at org.apache.spark.scheduler.Task.run(Task.scala:123)
 at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
21/08/26 13:01:36 ERROR Utils: Aborting task
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
 Failed to APPEND_FILE 
/user/sparkhive/warehouse/ranjan.db/all_datatypes_2048/Fact/Part0/Segment_0/part-0-0_batchno0-0-0-0.snappy.carbondata
 for DFSClient_NONMAPREDUCE_260546362_1 on 7.187.185.158 because 
DFSClient_NONMAPREDUCE_260546362_1 is already the current lease holder.
 at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2540)
 at 
org.apache.hadoop.hdfs.server.namenode.FSDirAppendOp.appendFile(FSDirAppendOp.java:124)
 at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2624)
 at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:805)
 at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:487)
 at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
 at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
 at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
 at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:872)
 at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:818)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
 at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2678)

at org.apache.hadoop.ipc.Client.call(Client.java:1475)
 at org.apache.hadoop.ipc.Client.call(Client.java:1412)
 at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
 at com.sun.proxy.$Proxy17.append(Unknown Source)
 at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:328)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:497)
 at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
 at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
 at com.sun.proxy.$Proxy18.append(Unknown Source)
 at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1808)
 at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1877)
 at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1847)
 at 
org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:340)
 at 
org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:336)
 at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 at 
org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:348)
 at 
org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:318)
 at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1166)
 at 
org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile.getDataOutputStreamUsingAppend(AbstractDFSCarbonFile.java:440)
 at 
org.apache.carbondata.core.datastore.impl.FileFactory.getDataOutputStreamUsingAppend(FileFactory.java:348)
 at 
org.apache.carbondata.streaming.CarbonStreamRecordWriter.initializeAtFirstRow(CarbonStreamRecordWriter.java:176)
 at 
org.apache.carbondata.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:210)
 at 
org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:278)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:349)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:338)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:338)
 at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:351)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$liftedTree1$1$1.apply(CarbonAppendableStreamSink.scala:271)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$liftedTree1$1$1.apply(CarbonAppendableStreamSink.scala:270)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
 at org.apache.spark.scheduler.Task.run(Task.scala:123)
 at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to