[structured-streaming][parquet] readStream files order in Parquet

2018-06-14 Thread karthikjay
My parquet files are first partitioned by environment and then by date like: env=testing/ date=2018-03-04/ part1.parquet part2.parquet part3.parquet date=2018-03-05/ part1.parquet part2.parquet part3.parquet date=2018-03-06/

Re: [Beginner][StructuredStreaming] Using Spark aggregation - WithWatermark on old data

2018-05-24 Thread karthikjay
My data looks like this: { "ts2" : "2018/05/01 00:02:50.041", "serviceGroupId" : "123", "userId" : "avv-0", "stream" : "", "lastUserActivity" : "00:02:50", "lastUserActivityCount" : "0" } { "ts2" : "2018/05/01 00:09:02.079", "serviceGroupId" : "123", "userId" : "avv-0",

[Beginner][StructuredStreaming] Using Spark aggregation - WithWatermark on old data

2018-05-23 Thread karthikjay
I am doing the following aggregation on the data val channelChangesAgg = tunerDataJsonDF .withWatermark("ts2", "10 seconds") .groupBy(window(col("ts2"),"10 seconds"), col("env"), col("servicegroupid")) .agg(count("linetransactionid") as

Re: [structured-streaming]How to reset Kafka offset in readStream and read from beginning

2018-05-23 Thread karthikjay
Chris, Thank you for responding. I get it. But, if I am using a console sink without checkpoint location, I do not see any messages in the console in IntellijIDEA IDE. I do not explicitly specify checkpointLocation in this case. How do I clear the working directory data and force Spark to read

[Beginner][StructuredStreaming] Console sink is not working as expected

2018-05-22 Thread karthikjay
I have the following code to read and process Kafka data using Structured Streaming object ETLTest { case class record(value: String, topic: String) def main(args: Array[String]): Unit = { run(); } def run(): Unit = {

[structured-streaming]How to reset Kafka offset in readStream and read from beginning

2018-05-22 Thread karthikjay
I have the following readstream in Spark structured streaming reading data from Kafka val kafkaStreamingDF = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "...") .option("subscribe", "testtopic") .option("failOnDataLoss", "false")

Spark job terminated without any errors

2018-05-18 Thread karthikjay
We have created multiples spark jobs (as far JAR) and run it using spark-submit in a nohup mode. Most of the jobs quits after a while. We tried to harness the logs for failures but the only message that gave us some clue was "18/05/07 18:31:38 INFO Worker: Executor app-20180507180436-0016/0

[structured-streaming] foreachPartition alternative in structured streaming.

2018-05-17 Thread karthikjay
I am reading data from Kafka using structured streaming and I need to save the data to InfluxDB. In the regular Dstreams based approach I did this as follows: val messages:DStream[(String, String)] = kafkaStream.map(record => (record.topic, record.value)) messages.foreachRDD { rdd =>

[structured-streaming][kafka] Will the Kafka readstream timeout after connections.max.idle.ms 540000 ms ?

2018-05-15 Thread karthikjay
Hi all, We are running into a scenario where the structured streaming job is exiting after a while specifically when the Kafka topic is not getting any data. >From the job logs, I see this connections.max.idle.ms = 54. Does that mean the spark readstream will close when it does not get data

[Structured-Streaming][Beginner] Out of order messages with Spark kafka readstream from a specific partition

2018-05-09 Thread karthikjay
On the producer side, I make sure data for a specific user lands on the same partition. On the consumer side, I use a regular Spark kafka readstream and read the data. I also use a console write stream to print out the spark kafka DataFrame. What I observer is, the data for a specific user (even

[beginner][StructuredStreaming] Null pointer exception - possible serialization errors.

2018-05-06 Thread karthikjay
I am getting a null pointer exception when trying to implement a connection pooling mechanism in Apache Spark. Any help appreciated. https://stackoverflow.com/questions/50205650/spark-connection-pooling-is-this-the-right-approach -- Sent from:

[Structured Streaming] [Kafka] How to repartition the data and distribute the processing among worker nodes

2018-04-20 Thread karthikjay
Any help appreciated. please find the question in the link: https://stackoverflow.com/questions/49951022/spark-structured-streaming-with-kafka-how-to-repartition-the-data-and-distribu -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

[Structured Streaming][Kafka] For a Kafka topic with 3 partitions, how does the parallelism work ?

2018-04-20 Thread karthikjay
I have the following code to read data from Kafka topic using the structured streaming. The topic has 3 partitions: val spark = SparkSession .builder .appName("TestPartition") .master("local[*]") .getOrCreate() import spark.implicits._ val dataFrame = spark

Writing record once after the watermarking interval in Spark Structured Streaming

2018-03-29 Thread karthikjay
I have the following query: val ds = dataFrame .filter(! $"requri".endsWith(".m3u8")) .filter(! $"bserver".contains("trimmer")) .withWatermark("time", "120 seconds") .groupBy(window(dataFrame.col("time"),"60 seconds"),col("channelName")) .agg(sum("bytes")/100

[Structured Streaming][Parquet] How do specify partition and data when saving to Parquet

2018-03-02 Thread karthikjay
My DataFrame has the following schema root |-- data: struct (nullable = true) ||-- zoneId: string (nullable = true) ||-- deviceId: string (nullable = true) ||-- timeSinceLast: long (nullable = true) |-- date: date (nullable = true) How can I do a writeStream with Parquet format

[Structured Streaming] Handling Kakfa Stream messages with different JSON Schemas.

2018-02-28 Thread karthikjay
Hi all, I have the following code to stream Kafka data and apply a schema called "hbSchema" on it and then act on the data. val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "10.102.255.241:9092") .option("subscribe", "mabr_avro_json1")