[ 
https://issues.apache.org/jira/browse/CARBONDATA-3926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yutao updated CARBONDATA-3926:
------------------------------
    Description: 
[https://github.com/apache/carbondata/blob/master/docs/flink-integration-guide.md]
 i work with this ,use spark sql create carbondata table and i can see 
 -rw-r--r-- 3 hadoop dc_cbss 2650 2020-07-25 21:06 
hdfs://beh/user/dc_cbss/warehouse/testyu.db/userpolicy/Metadata/schema

then i write flink app and run with yarn;

it work i can see carbonfile in my code defined directory ;

val dataTempPath = "hdfs://beh/user/dc_cbss/temp/"

[dc_cbss@hive_client_004 yutao]$ hdfs dfs -ls hdfs://beh/user/dc_cbss/temp/
 Found 10 items
 drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:47 
hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8
 drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:35 
hdfs://beh/user/dc_cbss/temp/359a873ec9624623af9beae18b630fde
 drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:44 
hdfs://beh/user/dc_cbss/temp/372f6065515e41a5b1d5e01af0a78d61
 drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:50 
hdfs://beh/user/dc_cbss/temp/3735b94780484f96b211ff6d6974ce3a
 drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:38 
hdfs://beh/user/dc_cbss/temp/8411793f4c5547dc930aacaeea3177cd
 drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:29 
hdfs://beh/user/dc_cbss/temp/915ff23f0d9e4c2dab699d1dcc5a8b4e
 drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:32 
hdfs://beh/user/dc_cbss/temp/bea0bef07d5f47cd92541c69b16aa64e
 drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:26 
hdfs://beh/user/dc_cbss/temp/c42c760144da4f9d83104af270ed46c1
 drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:41 
hdfs://beh/user/dc_cbss/temp/d8af69e47a5844a3a8ed7090ea13a278
 drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:50 
hdfs://beh/user/dc_cbss/temp/db6dceb913444c92a3453903fb50f486
 [dc_cbss@hive_client_004 yutao]$ hdfs dfs -ls 
hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/
 Found 8 items
 -rw-r--r-- 3 dc_cbss dc_cbss 3100 2020-07-27 14:45 
hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/24b93d2ffbc14472b3c0e3d2cd948632_batchno0-0-null-1595831979508.carbonindex
 -rw-r--r-- 3 dc_cbss dc_cbss 3104 2020-07-27 14:47 
hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/2da284a3beed4c15a3b60c7849d2da92_batchno0-0-null-1595832075416.carbonindex
 -rw-r--r-- 3 dc_cbss dc_cbss 3104 2020-07-27 14:47 
hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/70b01854c2d446889b91d4bc9203587c_batchno0-0-null-1595832123015.carbonindex
 -rw-r--r-- 3 dc_cbss dc_cbss 3110 2020-07-27 14:46 
hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/aae80851ef534c9ca6f95669d56ec636_batchno0-0-null-1595832028966.carbonindex
 -rw-r--r-- 3 dc_cbss dc_cbss 54526 2020-07-27 14:45 
hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/part-0-24b93d2ffbc14472b3c0e3d2cd948632_batchno0-0-null-1595831979508.snappy.carbondata
 -rw-r--r-- 3 dc_cbss dc_cbss 54710 2020-07-27 14:47 
hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/part-0-2da284a3beed4c15a3b60c7849d2da92_batchno0-0-null-1595832075416.snappy.carbondata
 -rw-r--r-- 3 dc_cbss dc_cbss 38684 2020-07-27 14:47 
hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/part-0-70b01854c2d446889b91d4bc9203587c_batchno0-0-null-1595832123015.snappy.carbondata
 -rw-r--r-- 3 dc_cbss dc_cbss 55229 2020-07-27 14:46 
hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/part-0-aae80851ef534c9ca6f95669d56ec636_batchno0-0-null-1595832028966.snappy.carbondata

 

but there no stage_data directory and data not mv to stage_data when flink app 
commit;

i debug code find in CarbonWriter.java file find  this method influence it ;

protected StageInput uploadSegmentDataFiles(final String localPath, final 
String remotePath) {

if (!this.table.isHivePartitionTable()) {
 final *{color:#ff0000}File[] files = new File(localPath).listFiles();{color}*
 if (files == null)

{ LOGGER.error("files is null" ); return null; }

Map<String, Long> fileNameMapLength = new HashMap<>(files.length);
 for (File file : files) {
 fileNameMapLength.put(file.getName(), file.length());
 if (LOGGER.isDebugEnabled())

{ LOGGER.debug( "Upload file[" + file.getAbsolutePath() + "] to [" + remotePath 
+ "] start."); }

try

{ CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), 
remotePath, 1024); }

catch (CarbonDataWriterException exception)

{ LOGGER.error(exception.getMessage(), exception); throw exception; }

if (LOGGER.isDebugEnabled())

{ LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath 
+ "] end."); }

}
 return new StageInput(remotePath, fileNameMapLength);
 } else {
 final List<StageInput.PartitionLocation> partitionLocationList = new 
ArrayList<>();
 final List<String> partitions = new ArrayList<>();
 uploadSegmentDataFiles(new File(localPath), remotePath, partitionLocationList, 
partitions);
 if (partitionLocationList.isEmpty())

{ return null; }

else

{ return new StageInput(remotePath, partitionLocationList); }

}

 the local path is a hdfs file so {color:#ff0000}files is null ;{color}

  was:
[https://github.com/apache/carbondata/blob/master/docs/flink-integration-guide.md]
 i work with this ,use spark sql create carbondata table and i can see 
 -rw-r--r-- 3 hadoop dc_cbss 2650 2020-07-25 21:06 
hdfs://beh/user/dc_cbss/warehouse/testyu.db/userpolicy/Metadata/schema

then i write flink app and run with yarn;

it work i can see carbonfile in my code defined directory ;

val dataTempPath = "hdfs://beh/user/dc_cbss/temp/"

[dc_cbss@hive_client_004 yutao]$ hdfs dfs -ls hdfs://beh/user/dc_cbss/temp/
Found 10 items
drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:47 
hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8
drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:35 
hdfs://beh/user/dc_cbss/temp/359a873ec9624623af9beae18b630fde
drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:44 
hdfs://beh/user/dc_cbss/temp/372f6065515e41a5b1d5e01af0a78d61
drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:50 
hdfs://beh/user/dc_cbss/temp/3735b94780484f96b211ff6d6974ce3a
drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:38 
hdfs://beh/user/dc_cbss/temp/8411793f4c5547dc930aacaeea3177cd
drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:29 
hdfs://beh/user/dc_cbss/temp/915ff23f0d9e4c2dab699d1dcc5a8b4e
drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:32 
hdfs://beh/user/dc_cbss/temp/bea0bef07d5f47cd92541c69b16aa64e
drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:26 
hdfs://beh/user/dc_cbss/temp/c42c760144da4f9d83104af270ed46c1
drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:41 
hdfs://beh/user/dc_cbss/temp/d8af69e47a5844a3a8ed7090ea13a278
drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:50 
hdfs://beh/user/dc_cbss/temp/db6dceb913444c92a3453903fb50f486
[dc_cbss@hive_client_004 yutao]$ hdfs dfs -ls 
hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/
Found 8 items
-rw-r--r-- 3 dc_cbss dc_cbss 3100 2020-07-27 14:45 
hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/24b93d2ffbc14472b3c0e3d2cd948632_batchno0-0-null-1595831979508.carbonindex
-rw-r--r-- 3 dc_cbss dc_cbss 3104 2020-07-27 14:47 
hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/2da284a3beed4c15a3b60c7849d2da92_batchno0-0-null-1595832075416.carbonindex
-rw-r--r-- 3 dc_cbss dc_cbss 3104 2020-07-27 14:47 
hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/70b01854c2d446889b91d4bc9203587c_batchno0-0-null-1595832123015.carbonindex
-rw-r--r-- 3 dc_cbss dc_cbss 3110 2020-07-27 14:46 
hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/aae80851ef534c9ca6f95669d56ec636_batchno0-0-null-1595832028966.carbonindex
-rw-r--r-- 3 dc_cbss dc_cbss 54526 2020-07-27 14:45 
hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/part-0-24b93d2ffbc14472b3c0e3d2cd948632_batchno0-0-null-1595831979508.snappy.carbondata
-rw-r--r-- 3 dc_cbss dc_cbss 54710 2020-07-27 14:47 
hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/part-0-2da284a3beed4c15a3b60c7849d2da92_batchno0-0-null-1595832075416.snappy.carbondata
-rw-r--r-- 3 dc_cbss dc_cbss 38684 2020-07-27 14:47 
hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/part-0-70b01854c2d446889b91d4bc9203587c_batchno0-0-null-1595832123015.snappy.carbondata
-rw-r--r-- 3 dc_cbss dc_cbss 55229 2020-07-27 14:46 
hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/part-0-aae80851ef534c9ca6f95669d56ec636_batchno0-0-null-1595832028966.snappy.carbondata

 

but there no stage_data directory and data not mv to stage_data when flink app 
commit;

i debug code find in CarbonWriter.java file protected StageInput 
uploadSegmentDataFiles(final String localPath, final String remotePath) {

if (!this.table.isHivePartitionTable()) {
 final *{color:#FF0000}File[] files = new File(localPath).listFiles();{color}*
 if (files == null) {
 LOGGER.error("files is null" );
 return null;
 }
 Map<String, Long> fileNameMapLength = new HashMap<>(files.length);
 for (File file : files) {
 fileNameMapLength.put(file.getName(), file.length());
 if (LOGGER.isDebugEnabled()) {
 LOGGER.debug(
 "Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] start.");
 }
 try {
 CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), 
remotePath, 1024);
 } catch (CarbonDataWriterException exception) {
 LOGGER.error(exception.getMessage(), exception);
 throw exception;
 }
 if (LOGGER.isDebugEnabled()) {
 LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + 
"] end.");
 }
 }
 return new StageInput(remotePath, fileNameMapLength);
} else {
 final List<StageInput.PartitionLocation> partitionLocationList = new 
ArrayList<>();
 final List<String> partitions = new ArrayList<>();
 uploadSegmentDataFiles(new File(localPath), remotePath, partitionLocationList, 
partitions);
 if (partitionLocationList.isEmpty()) {
 return null;
 } else {
 return new StageInput(remotePath, partitionLocationList);
 }
}

 the local path is a hdfs file so {color:#FF0000}files is null ;{color}


> flink-integration i find it can't move file to stage_data directory 
> --------------------------------------------------------------------
>
>                 Key: CARBONDATA-3926
>                 URL: https://issues.apache.org/jira/browse/CARBONDATA-3926
>             Project: CarbonData
>          Issue Type: Bug
>          Components: flink-integration
>    Affects Versions: 2.0.0, 2.0.1
>         Environment: my hadoop is cdh-5.16.1 and spark 2.3.3, flink 
> 1.10.1,hive 1.1.0
>            Reporter: yutao
>            Priority: Critical
>             Fix For: 2.1.0
>
>
> [https://github.com/apache/carbondata/blob/master/docs/flink-integration-guide.md]
>  i work with this ,use spark sql create carbondata table and i can see 
>  -rw-r--r-- 3 hadoop dc_cbss 2650 2020-07-25 21:06 
> hdfs://beh/user/dc_cbss/warehouse/testyu.db/userpolicy/Metadata/schema
> then i write flink app and run with yarn;
> it work i can see carbonfile in my code defined directory ;
> val dataTempPath = "hdfs://beh/user/dc_cbss/temp/"
> [dc_cbss@hive_client_004 yutao]$ hdfs dfs -ls hdfs://beh/user/dc_cbss/temp/
>  Found 10 items
>  drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:47 
> hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8
>  drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:35 
> hdfs://beh/user/dc_cbss/temp/359a873ec9624623af9beae18b630fde
>  drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:44 
> hdfs://beh/user/dc_cbss/temp/372f6065515e41a5b1d5e01af0a78d61
>  drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:50 
> hdfs://beh/user/dc_cbss/temp/3735b94780484f96b211ff6d6974ce3a
>  drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:38 
> hdfs://beh/user/dc_cbss/temp/8411793f4c5547dc930aacaeea3177cd
>  drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:29 
> hdfs://beh/user/dc_cbss/temp/915ff23f0d9e4c2dab699d1dcc5a8b4e
>  drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:32 
> hdfs://beh/user/dc_cbss/temp/bea0bef07d5f47cd92541c69b16aa64e
>  drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:26 
> hdfs://beh/user/dc_cbss/temp/c42c760144da4f9d83104af270ed46c1
>  drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:41 
> hdfs://beh/user/dc_cbss/temp/d8af69e47a5844a3a8ed7090ea13a278
>  drwxr-xr-x - dc_cbss dc_cbss 0 2020-07-27 14:50 
> hdfs://beh/user/dc_cbss/temp/db6dceb913444c92a3453903fb50f486
>  [dc_cbss@hive_client_004 yutao]$ hdfs dfs -ls 
> hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/
>  Found 8 items
>  -rw-r--r-- 3 dc_cbss dc_cbss 3100 2020-07-27 14:45 
> hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/24b93d2ffbc14472b3c0e3d2cd948632_batchno0-0-null-1595831979508.carbonindex
>  -rw-r--r-- 3 dc_cbss dc_cbss 3104 2020-07-27 14:47 
> hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/2da284a3beed4c15a3b60c7849d2da92_batchno0-0-null-1595832075416.carbonindex
>  -rw-r--r-- 3 dc_cbss dc_cbss 3104 2020-07-27 14:47 
> hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/70b01854c2d446889b91d4bc9203587c_batchno0-0-null-1595832123015.carbonindex
>  -rw-r--r-- 3 dc_cbss dc_cbss 3110 2020-07-27 14:46 
> hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/aae80851ef534c9ca6f95669d56ec636_batchno0-0-null-1595832028966.carbonindex
>  -rw-r--r-- 3 dc_cbss dc_cbss 54526 2020-07-27 14:45 
> hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/part-0-24b93d2ffbc14472b3c0e3d2cd948632_batchno0-0-null-1595831979508.snappy.carbondata
>  -rw-r--r-- 3 dc_cbss dc_cbss 54710 2020-07-27 14:47 
> hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/part-0-2da284a3beed4c15a3b60c7849d2da92_batchno0-0-null-1595832075416.snappy.carbondata
>  -rw-r--r-- 3 dc_cbss dc_cbss 38684 2020-07-27 14:47 
> hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/part-0-70b01854c2d446889b91d4bc9203587c_batchno0-0-null-1595832123015.snappy.carbondata
>  -rw-r--r-- 3 dc_cbss dc_cbss 55229 2020-07-27 14:46 
> hdfs://beh/user/dc_cbss/temp/33976d2f23344768b91c6ba3eadd22c8/part-0-aae80851ef534c9ca6f95669d56ec636_batchno0-0-null-1595832028966.snappy.carbondata
>  
> but there no stage_data directory and data not mv to stage_data when flink 
> app commit;
> i debug code find in CarbonWriter.java file find  this method influence it ;
> protected StageInput uploadSegmentDataFiles(final String localPath, final 
> String remotePath) {
> if (!this.table.isHivePartitionTable()) {
>  final *{color:#ff0000}File[] files = new File(localPath).listFiles();{color}*
>  if (files == null)
> { LOGGER.error("files is null" ); return null; }
> Map<String, Long> fileNameMapLength = new HashMap<>(files.length);
>  for (File file : files) {
>  fileNameMapLength.put(file.getName(), file.length());
>  if (LOGGER.isDebugEnabled())
> { LOGGER.debug( "Upload file[" + file.getAbsolutePath() + "] to [" + 
> remotePath + "] start."); }
> try
> { CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), 
> remotePath, 1024); }
> catch (CarbonDataWriterException exception)
> { LOGGER.error(exception.getMessage(), exception); throw exception; }
> if (LOGGER.isDebugEnabled())
> { LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + 
> remotePath + "] end."); }
> }
>  return new StageInput(remotePath, fileNameMapLength);
>  } else {
>  final List<StageInput.PartitionLocation> partitionLocationList = new 
> ArrayList<>();
>  final List<String> partitions = new ArrayList<>();
>  uploadSegmentDataFiles(new File(localPath), remotePath, 
> partitionLocationList, partitions);
>  if (partitionLocationList.isEmpty())
> { return null; }
> else
> { return new StageInput(remotePath, partitionLocationList); }
> }
>  the local path is a hdfs file so {color:#ff0000}files is null ;{color}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to