Re: Data skipped while writing Spark Streaming output to HDFS

2015-10-12 Thread Shixiong Zhu
Could you print the content of RDD to check if there are multiple values
for a key in a batch?

Best Regards,
Shixiong Zhu

2015-10-12 18:25 GMT+08:00 Sathiskumar :

> I'm running a Spark Streaming application for every 10 seconds, its job is
> to
> consume data from kafka, transform it and store it into HDFS based on the
> key. i.e, a file per unique key. I'm using the Hadoop's saveAsHadoopFile()
> API to store the output, I see that a file gets generated for every unique
> key, but the issue is that only one row gets stored for each of the unique
> key though the DStream has more rows for the same key.
>
> For example, consider the following DStream which has one unique key,
>
> *  key  value*
>  =   ===
>  Key_1   183.33 70.0 0.12 1.0 1.0 1.0 11.0 4.0
>  Key_1   184.33 70.0 1.12 1.0 1.0 1.0 11.0 4.0
>  Key_1   181.33 70.0 2.12 1.0 1.0 1.0 11.0 4.0
>  Key_1   185.33 70.0 1.12 1.0 1.0 1.0 11.0 4.0
>  Key_1   185.33 70.0 0.12 1.0 1.0 1.0 11.0 4.0
>
> I see only one row (instead of 5 rows) gets stored in the HDFS file,
>
> 185.33 70.0 0.12 1.0 1.0 1.0 11.0 4.0
>
> The following code is used to store the output into HDFS,
>
> dStream.foreachRDD(new Function, Void> () {
> @Override
> public Void call(JavaPairRDD pairRDD) throws Exception
> {
> long timestamp = System.currentTimeMillis();
> int randomInt = random.nextInt();
> pairRDD.saveAsHadoopFile("hdfs://localhost:9000/application-" +
> timestamp +"-"+ randomInt, String.class, String.class,
> RDDMultipleTextOutputFormat.class);
> }
> });
>
> where the implementation of RDDMultipleTextOutputFormat is as follows,
>
> public class RDDMultipleTextOutputFormat extends
> MultipleTextOutputFormat {
>
> public K generateActualKey(K key, V value) {
> return null;
> }
>
> public String generateFileNameForKeyValue(K key, V value, String name)
> {
> return key.toString();
> }
> }
>
> Please let me know if I'm missing anything? Thanks for your help.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Data-skipped-while-writing-Spark-Streaming-output-to-HDFS-tp25026.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Data skipped while writing Spark Streaming output to HDFS

2015-10-12 Thread Sathiskumar
I'm running a Spark Streaming application for every 10 seconds, its job is to
consume data from kafka, transform it and store it into HDFS based on the
key. i.e, a file per unique key. I'm using the Hadoop's saveAsHadoopFile()
API to store the output, I see that a file gets generated for every unique
key, but the issue is that only one row gets stored for each of the unique
key though the DStream has more rows for the same key.

For example, consider the following DStream which has one unique key,

*  key  value*
 =   ===
 Key_1   183.33 70.0 0.12 1.0 1.0 1.0 11.0 4.0 
 Key_1   184.33 70.0 1.12 1.0 1.0 1.0 11.0 4.0 
 Key_1   181.33 70.0 2.12 1.0 1.0 1.0 11.0 4.0 
 Key_1   185.33 70.0 1.12 1.0 1.0 1.0 11.0 4.0 
 Key_1   185.33 70.0 0.12 1.0 1.0 1.0 11.0 4.0 

I see only one row (instead of 5 rows) gets stored in the HDFS file,

185.33 70.0 0.12 1.0 1.0 1.0 11.0 4.0

The following code is used to store the output into HDFS,

dStream.foreachRDD(new Function, Void> () {
@Override
public Void call(JavaPairRDD pairRDD) throws Exception {
long timestamp = System.currentTimeMillis();
int randomInt = random.nextInt();
pairRDD.saveAsHadoopFile("hdfs://localhost:9000/application-" +
timestamp +"-"+ randomInt, String.class, String.class,
RDDMultipleTextOutputFormat.class);
}
});

where the implementation of RDDMultipleTextOutputFormat is as follows,

public class RDDMultipleTextOutputFormat extends   
MultipleTextOutputFormat {

public K generateActualKey(K key, V value) { 
return null;
}

public String generateFileNameForKeyValue(K key, V value, String name) { 
return key.toString();
}
}

Please let me know if I'm missing anything? Thanks for your help.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Data-skipped-while-writing-Spark-Streaming-output-to-HDFS-tp25026.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org