Re: Reading kafka and save to parquet problem

2018-03-07 Thread Junfeng Chen
I have ever tried to use readStream and writeStream, but it throw "Uri
without authority: hdfs:/data/_spark_metadata" exception, which is not seen
in normal read mode.
The parquet path I specified is  hdfs:///data


Regard,
Junfeng Chen

On Thu, Mar 8, 2018 at 9:38 AM, naresh Goud 
wrote:

> change it to readStream instead of read as below
>
> val df = spark
>   .readStream
> .format("kafka")
>   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
>   .option("subscribe", "topic1")
>   .load()
>
>
> Check is this helpful
>
> https://github.com/ndulam/KafkaSparkStreams/blob/master/SampleStreamApp/src/main/scala/com/naresh/org/SensorDataSave.scala
>
>
>
>
>
>
>
>
> Thanks,
> Naresh
> www.linkedin.com/in/naresh-dulam
> http://hadoopandspark.blogspot.com/
>
>
>
> On Wed, Mar 7, 2018 at 7:33 PM Junfeng Chen  wrote:
>
>> I am struggling in trying to read data in kafka and save them to parquet
>> file on hdfs by using spark streaming according to this post
>> https://stackoverflow.com/questions/45827664/read-
>> from-kafka-and-write-to-hdfs-in-parquet
>>
>> My code is similar to  following
>>
>> val df = spark
>>   .read
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
>>   .option("subscribe", "topic1")
>>   .load()
>> df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
>>   .as[(String, String)]
>>
>>   .write.parquet("hdfs://data.parquet")
>>
>>
>> What the difference is I am writing in Java language.
>>
>> But in practice, this code just run once and then exit gracefully.
>> Although it produces the parquet file successfully and no any exception is
>> threw out , it runs like a normal spark program rather than a spark
>> streaming program.
>>
>> What should I do if want to read kafka and save them to parquet in batch?
>>
>> Regard,
>> Junfeng Chen
>>
>


Re: Reading kafka and save to parquet problem

2018-03-07 Thread naresh Goud
change it to readStream instead of read as below

val df = spark
  .readStream
.format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()


Check is this helpful

https://github.com/ndulam/KafkaSparkStreams/blob/master/SampleStreamApp/src/main/scala/com/naresh/org/SensorDataSave.scala








Thanks,
Naresh
www.linkedin.com/in/naresh-dulam
http://hadoopandspark.blogspot.com/



On Wed, Mar 7, 2018 at 7:33 PM Junfeng Chen  wrote:

> I am struggling in trying to read data in kafka and save them to parquet
> file on hdfs by using spark streaming according to this post
> https://stackoverflow.com/questions/45827664/read-from-kafka-and-write-to-hdfs-in-parquet
>
> My code is similar to  following
>
> val df = spark
>   .read
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
>   .option("subscribe", "topic1")
>   .load()
> df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
>   .as[(String, String)]
>
>   .write.parquet("hdfs://data.parquet")
>
>
> What the difference is I am writing in Java language.
>
> But in practice, this code just run once and then exit gracefully.
> Although it produces the parquet file successfully and no any exception is
> threw out , it runs like a normal spark program rather than a spark
> streaming program.
>
> What should I do if want to read kafka and save them to parquet in batch?
>
> Regard,
> Junfeng Chen
>


Reading kafka and save to parquet problem

2018-03-07 Thread Junfeng Chen
I am struggling in trying to read data in kafka and save them to parquet
file on hdfs by using spark streaming according to this post
https://stackoverflow.com/questions/45827664/read-from-kafka-and-write-to-hdfs-in-parquet

My code is similar to  following

val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

  .write.parquet("hdfs://data.parquet")


What the difference is I am writing in Java language.

But in practice, this code just run once and then exit gracefully. Although
it produces the parquet file successfully and no any exception is threw out
, it runs like a normal spark program rather than a spark streaming program.

What should I do if want to read kafka and save them to parquet in batch?

Regard,
Junfeng Chen