My parquet files are first partitioned by environment and then by date like:
env=testing/
date=2018-03-04/
part1.parquet
part2.parquet
part3.parquet
date=2018-03-05/
part1.parquet
part2.parquet
part3.parquet
date=2018-03-06/
My data looks like this:
{
"ts2" : "2018/05/01 00:02:50.041",
"serviceGroupId" : "123",
"userId" : "avv-0",
"stream" : "",
"lastUserActivity" : "00:02:50",
"lastUserActivityCount" : "0"
}
{
"ts2" : "2018/05/01 00:09:02.079",
"serviceGroupId" : "123",
"userId" : "avv-0",
I am doing the following aggregation on the data
val channelChangesAgg = tunerDataJsonDF
.withWatermark("ts2", "10 seconds")
.groupBy(window(col("ts2"),"10 seconds"),
col("env"),
col("servicegroupid"))
.agg(count("linetransactionid") as
Chris,
Thank you for responding. I get it.
But, if I am using a console sink without checkpoint location, I do not see
any messages in the console in IntellijIDEA IDE. I do not explicitly specify
checkpointLocation in this case. How do I clear the working directory data
and force Spark to read
I have the following code to read and process Kafka data using Structured
Streaming
object ETLTest {
case class record(value: String, topic: String)
def main(args: Array[String]): Unit = {
run();
}
def run(): Unit = {
I have the following readstream in Spark structured streaming reading data
from Kafka
val kafkaStreamingDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "...")
.option("subscribe", "testtopic")
.option("failOnDataLoss", "false")
We have created multiples spark jobs (as far JAR) and run it using
spark-submit in a nohup mode. Most of the jobs quits after a while. We tried
to harness the logs for failures but the only message that gave us some clue
was "18/05/07 18:31:38 INFO Worker: Executor app-20180507180436-0016/0
I am reading data from Kafka using structured streaming and I need to save
the data to InfluxDB. In the regular Dstreams based approach I did this as
follows:
val messages:DStream[(String, String)] = kafkaStream.map(record =>
(record.topic, record.value))
messages.foreachRDD { rdd =>
Hi all,
We are running into a scenario where the structured streaming job is exiting
after a while specifically when the Kafka topic is not getting any data.
>From the job logs, I see this connections.max.idle.ms = 54. Does that
mean the spark readstream will close when it does not get data
On the producer side, I make sure data for a specific user lands on the same
partition. On the consumer side, I use a regular Spark kafka readstream and
read the data. I also use a console write stream to print out the spark
kafka DataFrame. What I observer is, the data for a specific user (even
I am getting a null pointer exception when trying to implement a connection
pooling mechanism in Apache Spark. Any help appreciated.
https://stackoverflow.com/questions/50205650/spark-connection-pooling-is-this-the-right-approach
--
Sent from:
Any help appreciated. please find the question in the link:
https://stackoverflow.com/questions/49951022/spark-structured-streaming-with-kafka-how-to-repartition-the-data-and-distribu
--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
I have the following code to read data from Kafka topic using the structured
streaming. The topic has 3 partitions:
val spark = SparkSession
.builder
.appName("TestPartition")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val dataFrame = spark
I have the following query:
val ds = dataFrame
.filter(! $"requri".endsWith(".m3u8"))
.filter(! $"bserver".contains("trimmer"))
.withWatermark("time", "120 seconds")
.groupBy(window(dataFrame.col("time"),"60
seconds"),col("channelName"))
.agg(sum("bytes")/100
My DataFrame has the following schema
root
|-- data: struct (nullable = true)
||-- zoneId: string (nullable = true)
||-- deviceId: string (nullable = true)
||-- timeSinceLast: long (nullable = true)
|-- date: date (nullable = true)
How can I do a writeStream with Parquet format
Hi all,
I have the following code to stream Kafka data and apply a schema called
"hbSchema" on it and then act on the data.
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "10.102.255.241:9092")
.option("subscribe", "mabr_avro_json1")
16 matches
Mail list logo