Hi all,
I'm having a binary file composed of messages which length is 57 bytes. The
binary file contains exactly 100000 message and its size is about 44 MB ( I '
ve already verified that ).
What I do simply is reading the file via
JavaStreaminContext.binaryRecordsStream("folder",57) so I construct a
JavaDStream<byte[]>. After that I collect it and prints it in a buffer .
The problem is that an only record is being created if I put its length to 57
when using JavaStreamingContext.binaryRecordsStream() after that in the
collect() ther are exactly 1000000 records which are created if length of the
record is 57 byte but are all the same (it contains the value of the first
message of the file).
In the other hand if i put the record length in binaryRecordStream() to
57000000 , an only record would be created when using both of the method. I
verified that it does contains all of messages.
If the record file is 285000 byte then binaryRecordStream() would create only
one record which contains the first half of messages and collect() would create
two records which are the same.
I do believe the problem is in binaryRecordsStream() and collect() does only
what is supposed to do.
I'm very grateful in advance for your help.
Hamza
Here is a sample of the code :
public static void main(String[] args) throws IniParserException, IOException,
InterruptedException{
Path input = Paths.get("folder");
Ini ini = new Ini().read(input);
Map<String,Map<String,String>> sections =
ini.getSections();
//get key,values of SPARK_PARAMETERS
Map<String,String> SPARK_PARAMETERS =
sections.get("SPARK_PARAMETERS");
// Set up the context
SparkConf spark_conf = new SparkConf();
spark_conf.setMaster("local[*]");
spark_conf.setAppName( "name" );
//configration des paramètres de Spark
for (String optionKey:
SPARK_PARAMETERS.keySet()) {
spark_conf.set(optionKey,
SPARK_PARAMETERS.get(optionKey));
}
//configuration spark streaming
JavaSparkContext sc = new
JavaSparkContext(spark_conf);
//Configuration des paramètres concernant
SPARK_STREAMING
final Map<String,String>
SPARK_STREAMING_PARAMETERS = sections.get("SPARK_STREAMING_PARAMETERS");
JavaStreamingContext ssc = new
JavaStreamingContext(sc,Durations.seconds(Integer.parseInt(SPARK_STREAMING_PARAMETERS.get("bash.duration"))));
ssc.checkpoint(SPARK_STREAMING_PARAMETERS.get("checkpoint.dir")+"/checkpoint_"
+ Long.toString(System.currentTimeMillis()));
//read a binary Source
JavaDStream<byte[]> binary =
ssc.binaryRecordsStream("folder",171);
binary.foreachRDD(new VoidFunction<JavaRDD<byte[]>>() {
public void call(JavaRDD<byte[]> rdd)
throws IOException {
if(!rdd.isEmpty()){
//Collect
List<byte[]>
list1Structure =rdd.collect();
//decode the message
for(byte[] oct :
list1Structure){
byte[] data = new
byte[2];
//read size message
message
for(int j =0;j<2;j++ ){
data[j] = oct[j];
}
long size =
byteArrayToInt(data);
//lire la signature
data = new byte[11];
for(int j =0;j<11;j++ ){
data[j] = oct[j+10];
}
String signature = new
String(data);
System.out.println("size
"+size+" signature "+signature);
}
}
}
});
ssc.start();
ssc.awaitTermination();
}