hello,
Code:
ZkState zkState = new ZkState(kafkaConfig);
DynamicBrokersReader kafkaBrokerReader = new
DynamicBrokersReader(kafkaConfig, zkState);
int partionCount = kafkaBrokerReader.getNumPartitions();
SparkConf _sparkConf = new SparkConf().setAppName("KafkaReceiver");
final JavaStreamingContext ssc = new JavaStreamingContext(_sparkConf, new
Duration(1000));
final JavaReceiverInputDStream inputStream = ssc.receiverStream(new
KafkaReceiver(_props, partionCount));
final JavaSQLContext sqlContext = new
org.apache.spark.sql.api.java.JavaSQLContext(ssc.sparkContext());
inputStream.foreachRDD(new Function2, Time, Void>() {
@Override
public Void call(JavaRDD rdd, Time time) throws Exception {
if(rdd != null) {
JavaSchemaRDD schemaObject = sqlContext.jsonRDD(rdd);
schemaObject.saveAsParquetFile("tweet" + time.toString() + ".parquet");
System.out.println("File Saved Successfully");
}else {
System.out.println("rdd is empty");
}
Error:
java.lang.UnsupportedOperationException: empty collection
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:806)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:806)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:806)
at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:40)
at
org.apache.spark.sql.api.java.JavaSQLContext.jsonRDD(JavaSQLContext.scala:123)
at consumer.kafka.client.Consumer$1.call(Consumer.java:115)
at consumer.kafka.client.Consumer$1.call(Consumer.java:109)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Please suggest me solution.
Thanks in Advance.
On Thu, Aug 7, 2014 at 1:06 AM, Tathagata Das
wrote:
> You can use SparkSQL for that very easily. You can convert the rdds you
> get from kafka input stream, convert them to a RDDs of case classes and
> save as parquet files.
> More information here.
>
> https://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files
>
>
> On Wed, Aug 6, 2014 at 5:23 AM, Mahebub Sayyed
> wrote:
>
>> Hello,
>>
>> I have referred link "https://github.com/dibbhatt/kafka-spark-consumer";
>> and I have successfully consumed tuples from kafka.
>> Tuples are JSON objects and I want to store that objects in HDFS as
>> parque format.
>>
>> Please suggest me any sample example for that.
>> Thanks in advance.
>>
>>
>>
>>
>>
>> On Tue, Aug 5, 2014 at 11:55 AM, Dibyendu Bhattacharya <
>> dibyendu.bhattach...@gmail.com> wrote:
>>
>>> You can try this Kafka Spark Consumer which I recently wrote. This uses
>>> the Low Level Kafka Consumer
>>>
>>> https://github.com/dibbhatt/kafka-spark-consumer
>>>
>>> Dibyendu
>>>
>>>
>>>
>>>
>>> On Tue, Aug 5, 2014 at 12:52 PM, rafeeq s
>>> wrote:
>>>
Hi,
I am new to Apache Spark and Trying to Develop spark streaming program
to *stream data from kafka topics and output as parquet file on HDFS*.
Please share the *sample reference* program to stream data from kafka
topics and output as parquet file on HDFS.
Thanks in Advance.
Regards,
Rafeeq S
*(“What you do is what matters, not what you think or say or plan.” )*
>>>
>>
>>
>> --
>>
>>
>