Re: OOM: Structured Streaming aggregation state not cleaned up properly

2018-05-23 Thread Jungtaek Lim
The issue looks like fixed in
https://issues.apache.org/jira/browse/SPARK-23670, and likely 2.3.1 will
include the fix.

-Jungtaek Lim (HeartSaVioR)

2018년 5월 23일 (수) 오후 7:12, weand <andreas.we...@gmail.com>님이 작성:

> Thanks for clarification. So it really seem a Spark UI OOM Issue.
>
> After setting:
> --conf spark.sql.ui.retainedExecutions=10
> --conf spark.worker.ui.retainedExecutors=10
> --conf spark.worker.ui.retainedDrivers=10
> --conf spark.ui.retainedJobs=10
> --conf spark.ui.retainedStages=10
> --conf spark.ui.retainedTasks=10
> --conf spark.streaming.ui.retainedBatches=10
>
> ...driver memory consumption still increases constantly over time (ending
> in
> OOM).
>
> TOP 10 Records by Heap Consumption:
> Class Name| Objects |
> Shallow Heap |Retained Heap
>
> --
> org.apache.spark.status.ElementTrackingStore  |   1 |
>
> 40 | >= 1.793.945.416
> org.apache.spark.util.kvstore.InMemoryStore   |   1 |
>
> 24 | >= 1.793.944.760
> org.apache.spark.util.kvstore.InMemoryStore$InstanceList  |  13 |
>
> 416 | >= 1.792.311.104
> org.apache.spark.sql.execution.ui.SparkPlanGraphWrapper   |  16.472 |
>
> 527.104 | >= 1.430.379.120
> org.apache.spark.sql.execution.ui.SparkPlanGraphNodeWrapper   | 378.856 |
>
> 9.092.544 | >= 1.415.224.880
> org.apache.spark.sql.execution.ui.SparkPlanGraphNode  | 329.440 |
> 10.542.080 | >= 1.389.888.112
> org.apache.spark.sql.execution.ui.SparkPlanGraphClusterWrapper|  49.416 |
>
> 1.976.640 |   >= 957.701.152
> org.apache.spark.sql.execution.ui.SQLExecutionUIData  |   1.000 |
>
> 64.000 |   >= 344.103.096
> org.apache.spark.sql.execution.ui.SQLPlanMetric   | 444.744 |
> 14.231.808 |>= 14.231.808
> org.apache.spark.sql.execution.ui.SparkPlanGraphEdge  | 312.968 |
> 10.014.976 |>= 10.014.976
>
> --
>
> >300k instances per SparkPlanGraphNodeWrapper, SparkPlanGraphNode and
> SQLPlanMetric.
>
> BTW: we are using 2.3.0.
>
> Shall I fill a new Jira for that memory leak in Spark UI? Only found
> https://issues.apache.org/jira/browse/SPARK-15716 but seems something
> different.
>
> Trying with spark.ui.enabled=false in the meantime.
>
>
> Tathagata Das wrote
> > Just to be clear, these screenshots are about the memory consumption of
> > the
> > driver. So this is nothing to do with streaming aggregation state which
> > are
> > kept in the memory of the executors, not the driver.
> >
> > On Tue, May 22, 2018 at 10:21 AM, Jungtaek Lim 
>
> > kabhwan@
>
> >  wrote:
> >
> >> 1. Could you share your Spark version?
> >> 2. Could you reduce "spark.sql.ui.retainedExecutions" and see whether it
> >> helps? This configuration is available in 2.3.0, and default value is
> >> 1000.
> >>
> >> Thanks,
> >> Jungtaek Lim (HeartSaVioR)
> >>
> >> 2018년 5월 22일 (화) 오후 4:29, weand 
>
> > andreas.weise@
>
> > 님이 작성:
> >>
> >>> You can see it even better on this screenshot:
> >>>
> >>> TOP Entries Collapsed #2
> >>> http://apache-spark-user-list.1001560.n3.nabble.com/
> > > file/t8542/27_001.png>
> >>>
> >>> Sorry for the spam, attached a not so perfect screen in the mail
> before.
> >>>
> >>>
> >>>
> >>> --
> >>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> >>>
> >>> -
> >>> To unsubscribe e-mail:
>
> > user-unsubscribe@.apache
>
> >>>
> >>>
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: OOM: Structured Streaming aggregation state not cleaned up properly

2018-05-22 Thread Jungtaek Lim
1. Could you share your Spark version?
2. Could you reduce "spark.sql.ui.retainedExecutions" and see whether it
helps? This configuration is available in 2.3.0, and default value is 1000.

Thanks,
Jungtaek Lim (HeartSaVioR)

2018년 5월 22일 (화) 오후 4:29, weand <andreas.we...@gmail.com>님이 작성:

> You can see it even better on this screenshot:
>
> TOP Entries Collapsed #2
> <http://apache-spark-user-list.1001560.n3.nabble.com/file/t8542/27_001.png>
>
>
> Sorry for the spam, attached a not so perfect screen in the mail before.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [Structured Streaming] Metrics or logs of events that are ignored due to watermark

2018-07-02 Thread Jungtaek Lim
Hi,

I have tried it via https://github.com/apache/spark/pull/21617 but soon
realized that it is not accurate count of late input rows because Spark
lazily applies watermark and discards rows at state operator(s) which
inputs are not necessarily same as origin input rows (some already filtered
out, multiple rows aggregated into one).

To get accurate count (or rows itself) of late input rows, we should filter
out late input rows in first phase of query. It would be less flexible
(mostly derived field no longer becomes watermark field) but majority of
streaming frameworks adopt this policy and provide late input rows based on
this.

So I think this is valuable to address, and I'm planning to try to address
it, but it would be OK for someone to address it earlier.

Thanks,
Jungtaek Lim (HeartSaVioR)

2018년 7월 3일 (화) 오전 3:39, subramgr 님이 작성:

> Hi all,
>
> Do we have some logs or some metrics that get recorded in log files or some
> metrics sinker about the number of events that are ignored due to watermark
> in structured streaming?
>
> Thanks
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Error while doing stream-stream inner join (java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access)

2018-07-03 Thread Jungtaek Lim
Could you please describe the version of Spark, and how did you run your
app? If you don’t mind to share minimal app which can reproduce this, it
would be really great.

- Jungtaek Lim (HeartSaVioR)
On Mon, 2 Jul 2018 at 7:56 PM kant kodali  wrote:

> Hi All,
>
> I get the below error quite often when I do an stream-stream inner join on
> two data frames. After running through several experiments stream-stream
> joins dont look stable enough for production yet. any advice on this?
>
> Thanks!
>
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access
> 18/07/02 09:32:14 INFO LineBufferedStream: stdout: at
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
> 18/07/02 09:32:14 INFO LineBufferedStream: stdout: at
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1361)
> 18/07/02 09:32:14 INFO LineBufferedStream: stdout: at
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.close(CachedKafkaConsumer.scala:301)
>


Re: RepartitionByKey Behavior

2018-06-21 Thread Jungtaek Lim
It is not possible because the cardinality of the partitioning key is
non-deterministic, while partition count should be fixed. There's a chance
that cardinality > partition count and then the system can't ensure the
requirement.

Thanks,
Jungtaek Lim (HeartSaVioR)

2018년 6월 22일 (금) 오전 8:55, Chawla,Sumit 님이 작성:

> Based on code read it looks like Spark does modulo of key for partition.
> Keys of c and b end up pointing to same value.  Whats the best partitioning
> scheme to deal with this?
>
> Regards
>
> Sumit Chawla
>
>
> On Thu, Jun 21, 2018 at 4:51 PM, Chawla,Sumit 
> wrote:
>
>> Hi
>>
>>  I have been trying to this simple operation.  I want to land all values
>> with one key in same partition, and not have any different key in the same
>> partition.  Is this possible?   I am getting b and c always getting mixed
>> up in the same partition.
>>
>>
>> rdd = sc.parallelize([('a', 5), ('d', 8), ('b', 6), ('a', 8), ('d', 9),
>> ('b', 3),('c', 8)])
>> from pyspark.rdd import portable_hash
>>
>> n = 4
>>
>> def partitioner(n):
>> """Partition by the first item in the key tuple"""
>> def partitioner_(x):
>> val = x[0]
>> key = portable_hash(x[0])
>> print ("Val %s Assigned Key %s" % (val, key))
>> return key
>> return partitioner_
>>
>> def validate(part):
>> last_key = None
>> for p in part:
>> k = p[0]
>> if not last_key:
>> last_key = k
>> if k != last_key:
>> print("Mixed keys in partition %s %s" % (k,last_key) )
>>
>> partioned = (rdd
>>   .keyBy(lambda kv: (kv[0], kv[1]))
>>   .repartitionAndSortWithinPartitions(
>>   numPartitions=n, partitionFunc=partitioner(n),
>> ascending=False)).map(lambda x: x[1])
>>
>> print(partioned.getNumPartitions())
>> partioned.foreachPartition(validate)
>>
>>
>> Val a Assigned Key -7583489610679606711
>> Val a Assigned Key -7583489610679606711
>> Val d Assigned Key 2755936516345535118
>> Val b Assigned Key -1175849324817995036
>> Val c Assigned Key 1421958803217889556
>> Val d Assigned Key 2755936516345535118
>> Val b Assigned Key -1175849324817995036
>> Mixed keys in partition b c
>> Mixed keys in partition b c
>>
>>
>> Regards
>> Sumit Chawla
>>
>>
>


Re: [Spark Structured Streaming] Measure metrics from CsvSink for Rate source

2018-06-21 Thread Jungtaek Lim
I'm referring to 2.4.0-SNAPSHOT (not sure which commit I'm referring) but
it properly returns the input rate.

$ tail -F
/tmp/spark-trial-metric/local-1529640063554.driver.spark.streaming.counts.inputRate-total.csv
t,value
1529640073,0.0
1529640083,0.9411272613196695
1529640093,0.9430996541967934
1529640103,1.0606060606060606
1529640113,0.9997000899730081

Could you add streaming query listener and see the value of sources ->
numInputRows, inputRowsPerSecond, processedRowsPerSecond? They should
provide some valid numbers.

Thanks,
Jungtaek Lim (HeartSaVioR)

2018년 6월 22일 (금) 오전 11:49, Dhruv Kumar 님이 작성:

> Hi
>
> I was trying to measure the performance metrics for spark structured
> streaming. But I am unable to see any data in the metrics log files. My
> input source is the Rate source
> <https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#creating-streaming-dataframes-and-streaming-datasets>
>  which
> generates data at the specified number of rows per second. Here is the link
> to my code
> <https://gist.github.com/kudhru/e1ce6b3f399c546be5eeb1f590087992> and
> metrics.properties
> <https://gist.github.com/kudhru/5d8a8f4d53c766e9efad4de2ae9b82d6> file.
>
> When I run the above mentioned code using *spark-submit, *I see that the
> metrics logs (for example,
> run_1.driver.spark.streaming.aggregates.inputRate-total.csv) are created
> under the specified directory but most of the values are 0.
> Below is a portion of the inputeRate-total.csv file:
> 1529634585,0.0
> 1529634595,0.0
> 1529634605,0.0
> 1529634615,0.0
> 1529634625,0.0
> 1529634635,0.0
> 1529634645,0.0
> 1529634655,0.0
> 1529634665,0.0
> 1529634675,0.0
> 1529634685,0.0
> 1529634695,0.0
> 1529634705,0.0
> 1529634715,0.0
>
> Any reason as to why this must be happening? Happy to share more
> information if that helps.
>
> Thanks
> --
> *Dhruv Kumar*
> PhD Candidate
> Department of Computer Science and Engineering
> University of Minnesota
> www.dhruvkumar.me
>
>


Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-09 Thread Jungtaek Lim
I could be wrong so welcome anyone to correct me if I'm missing here.

You can expect Spark operators in narrow dependency as applying wrapped
functions to an iterator (like "op3(op2(op1(iter)))"), and with such
expectation there's no way to achieve adjusting partitions. Each partition
is independent from others and there's no communication between tasks.

So if you have 1000 partitions (in terms of parallelism, not data
partitions) and willing to reduce down (or scale out) to some arbitrary
number of partitions, it would require moving of data and requires shuffle.

The meaning of "spark.sql.shuffle.partitions" is especially important for
structured streaming because it defines data partitions of state. For
structured streaming, there're couple of operations which leverage state
which is stored to the file system. The state is partitioned by key
columns, and "spark.sql.shuffle.partitions" data partitions are generated.
Due to the nature of hash function, once you run the streaming query,
"spark.sql.shuffle.partitions" keeps unchanged (Spark doesn't allow
reconfigure for the config).

So the value of configuration represents data partitions of state, as well
as max parallelism of stateful operators. If we want to have less
parallelism (not same as number of partitions), we should apply coalesce to
the operator and the number of partitions are still kept unchanged whereas
it incurs less parallelism and also less tasks.

We just can't apply coalesce to individual operator in narrow dependency.

-Jungtaek Lim (HeartSaVioR)
2018년 8월 9일 (목) 오후 3:07, Koert Kuipers 님이 작성:

> well an interesting side effect of this is that i can now control the
> number of partitions for every shuffle in a dataframe job, as opposed to
> having a single setting for number of partitions across all shuffles.
>
> basically i can set spark.sql.shuffle.partitions to some huge number, and
> then for every groupByKey (or any other shuffle operation) follow it up
> with a coalesce to set the number of partitions. its like i have
> numPartitions back from those good old RDD shuffle methods :)
>
>
> On Thu, Aug 9, 2018 at 1:38 AM, Koert Kuipers  wrote:
>
>> an new map task after a shuffle is also a narrow dependency, isnt it? its
>> narrow because data doesn't need to move, e.g. every partition depends on
>> single partition, preferably on same machine.
>>
>> modifying a previous shuffle to avoid a shuffle strikes me as odd, and
>> can potentially make a mess of performance, especially when no shuffle is
>> needed. just a new map task.
>>
>>
>> On Thu, Aug 9, 2018 at 1:15 AM, Jungtaek Lim  wrote:
>>
>>> > shouldnt coalesce introduce a new map-phase with less tasks instead of
>>> changing the previous shuffle?
>>>
>>> The javadoc of Dataset.coalesce [1] describes such behavior clearly. It
>>> results in narrow dependency, hence no shuffle.
>>>
>>> So it is pretty clear that you need to use "repartition". Not sure
>>> there's any available trick to achieve it without calling repartition.
>>>
>>> Thanks,
>>> Jungtaek Lim (HeartSaVioR)
>>>
>>> 1.
>>> https://github.com/apache/spark/blob/a40806d2bd84e9a0308165f0d6c97e9cf00aa4a3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2918-L2937
>>>
>>>
>>> 2018년 8월 9일 (목) 오전 5:55, Koert Kuipers 님이 작성:
>>>
>>>> sorry i meant to say:
>>>> wit a checkpoint i get a map phase with lots of tasks to read the data,
>>>> then a reduce phase with 2048 reducers, and then finally a map phase with
>>>> 100 tasks.
>>>>
>>>> On Wed, Aug 8, 2018 at 4:54 PM, Koert Kuipers 
>>>> wrote:
>>>>
>>>>> the only thing that seems to stop this so far is a checkpoint.
>>>>>
>>>>> wit a checkpoint i get a map phase with lots of tasks to read the
>>>>> data, then a reduce phase with 2048 reducers, and then finally a map phase
>>>>> with 4 tasks.
>>>>>
>>>>> now i need to figure out how to do this without having to checkpoint.
>>>>> i wish i could insert something like a dummy operation that logical steps
>>>>> cannot jump over.
>>>>>
>>>>> On Wed, Aug 8, 2018 at 4:22 PM, Koert Kuipers 
>>>>> wrote:
>>>>>
>>>>>> ok thanks.
>>>>>>
>>>>>> mh. that seems odd. shouldnt coalesce introduce a new map-phase
>>>>>> with less tasks instead of changing the previous shuffle?
>>>>>>
>>>>>> using repartition seems too e

Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Jungtaek Lim
> shouldnt coalesce introduce a new map-phase with less tasks instead of
changing the previous shuffle?

The javadoc of Dataset.coalesce [1] describes such behavior clearly. It
results in narrow dependency, hence no shuffle.

So it is pretty clear that you need to use "repartition". Not sure there's
any available trick to achieve it without calling repartition.

Thanks,
Jungtaek Lim (HeartSaVioR)

1.
https://github.com/apache/spark/blob/a40806d2bd84e9a0308165f0d6c97e9cf00aa4a3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2918-L2937


2018년 8월 9일 (목) 오전 5:55, Koert Kuipers 님이 작성:

> sorry i meant to say:
> wit a checkpoint i get a map phase with lots of tasks to read the data,
> then a reduce phase with 2048 reducers, and then finally a map phase with
> 100 tasks.
>
> On Wed, Aug 8, 2018 at 4:54 PM, Koert Kuipers  wrote:
>
>> the only thing that seems to stop this so far is a checkpoint.
>>
>> wit a checkpoint i get a map phase with lots of tasks to read the data,
>> then a reduce phase with 2048 reducers, and then finally a map phase with 4
>> tasks.
>>
>> now i need to figure out how to do this without having to checkpoint. i
>> wish i could insert something like a dummy operation that logical steps
>> cannot jump over.
>>
>> On Wed, Aug 8, 2018 at 4:22 PM, Koert Kuipers  wrote:
>>
>>> ok thanks.
>>>
>>> mh. that seems odd. shouldnt coalesce introduce a new map-phase with
>>> less tasks instead of changing the previous shuffle?
>>>
>>> using repartition seems too expensive just to keep the number of files
>>> down. so i guess i am back to looking for another solution.
>>>
>>>
>>>
>>> On Wed, Aug 8, 2018 at 4:13 PM, Vadim Semenov 
>>> wrote:
>>>
>>>> `coalesce` sets the number of partitions for the last stage, so you
>>>> have to use `repartition` instead which is going to introduce an extra
>>>> shuffle stage
>>>>
>>>> On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers  wrote:
>>>> >
>>>> > one small correction: lots of files leads to pressure on the spark
>>>> driver program when reading this data in spark.
>>>> >
>>>> > On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers 
>>>> wrote:
>>>> >>
>>>> >> hi,
>>>> >>
>>>> >> i am reading data from files into a dataframe, then doing a groupBy
>>>> for a given column with a count, and finally i coalesce to a smaller number
>>>> of partitions before writing out to disk. so roughly:
>>>> >>
>>>> >>
>>>> spark.read.format(...).load(...).groupBy(column).count().coalesce(100).write.format(...).save(...)
>>>> >>
>>>> >> i have this setting: spark.sql.shuffle.partitions=2048
>>>> >>
>>>> >> i expect to see 2048 partitions in shuffle. what i am seeing instead
>>>> is a shuffle with only 100 partitions. it's like the coalesce has taken
>>>> over the partitioning of the groupBy.
>>>> >>
>>>> >> any idea why?
>>>> >>
>>>> >> i am doing coalesce because it is not helpful to write out 2048
>>>> files, lots of files leads to pressure down the line on executors reading
>>>> this data (i am writing to just one partition of a larger dataset), and
>>>> since i have less than 100 executors i expect it to be efficient. so sounds
>>>> like a good idea, no?
>>>> >>
>>>> >> but i do need 2048 partitions in my shuffle due to the operation i
>>>> am doing in the groupBy (in my real problem i am not just doing a 
>>>> count...).
>>>> >>
>>>> >> thanks!
>>>> >> koert
>>>> >>
>>>> >
>>>>
>>>>
>>>> --
>>>> Sent from my iPhone
>>>>
>>>
>>>
>>
>


Re: Structured Streaming doesn't write checkpoint log when I use coalesce

2018-08-09 Thread Jungtaek Lim
Which version do you use? Above app works with Spark 2.3.1, 200 partitions
are stored for State.

val queryStatusFile = conf.queryStatusFile()
val rateRowPerSecond = conf.rateRowPerSecond()
val rateRampUpTimeSecond = conf.rateRampUpTimeSecond()

val ss = SparkSession
  .builder()
  .master("local[3]")
  .appName("state coalesce test")
  .getOrCreate()

ss.streams.addListener(new
QueryListenerWriteProgressToFile(queryStatusFile))

import ss.implicits._

val df = ss.readStream
  .format("rate")
  .option("rowsPerSecond", rateRowPerSecond)
  .option("rampUpTime", s"${rateRampUpTimeSecond}s")
  .load()

df.printSchema()

val outDf = df.withWatermark("timestamp", "10 seconds")
  .selectExpr(
"timestamp", "mod(value, 100) as mod", "value",
BenchmarkQueryHelper.createCaseExprStr(
  "mod(CAST(RANDN(0) * 1000 as INTEGER), 50)", 50, 10) + " as word")
  .groupBy(
window($"timestamp", "1 minute", "10 seconds"),
$"mod", $"word")
  .agg(max("value").as("max_value"), min("value").as("min_value"),
avg("value").as("avg_value"))
  .coalesce(8)

val query = outDf.writeStream
  .format("memory")
  .option("queryName", "stateCoalesceTest")
  .option("checkpointLocation", "/tmp/state-coalesce-test")
  .trigger(Trigger.ProcessingTime("5 seconds"))
  .outputMode(OutputMode.Update())
  .start()

query.awaitTermination()

-Jungtaek Lim (HeartSaVioR)


2018년 8월 9일 (목) 오후 8:38, WangXiaolong 님이 작성:

> Hi,
>
>Lately, I encountered a problem, when I was writing as structured
> streaming job to write things into opentsdb.
>   The write-stream part looks something like
>
>   outputDs
>   .coalesce(14)
>   .writeStream
>   .outputMode("append")
>   .trigger(Trigger.ProcessingTime(s"$triggerSeconds seconds"))
>   .option("checkpointLocation",s"$checkpointDir/$appName/tsdb")
>   .foreach {
> TsdbWriter(
>   tsdbUrl,
>   MongoProp(mongoUrl, mongoPort, mongoUser, mongoPassword,
> mongoDatabase, mongoCollection,mongoAuthenticationDatabase)
> )(createMetricBuilder(tsdbMetricPrefix))
>   }
>   .start()
>
> And when I check the checkpoint dir, I discover that the
> "/checkpoint/state" dir  is empty. I looked into the executor's log and
> found that the HDFSBackedStateStoreProvider didn't write anything on the
> checkpoint dir.
>
>Strange thing is, when I replace the "coalesce" function into
> "repartition" function, the problem solved. Is there a difference between
> these two functions when using structured streaming?
>
>   Looking forward to you help, thanks.
>
>
>
>
>


Re: getting error: value toDF is not a member of Seq[columns]

2018-09-05 Thread Jungtaek Lim
I guess you need to have encoder for the type of result for columns().

https://github.com/apache/spark/blob/2119e518d31331e65415e0f817a6f28ff18d2b42/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala#L227-L229

implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
  DatasetHolder(_sqlContext.createDataset(rdd))
}

You can see lots of Encoder implementations in the scala code. If your type
doesn't match anything it may not work and you need to provide custom
Encoder.

-Jungtaek Lim (HeartSaVioR)

2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh 님이 작성:

> Thanks
>
> I already do that as below
>
> val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
>   import sqlContext.implicits._
>
> but still getting the error!
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim  wrote:
>
>> You may need to import implicits from your spark session like below:
>> (Below code is borrowed from
>> https://spark.apache.org/docs/latest/sql-programming-guide.html)
>>
>> import org.apache.spark.sql.SparkSession
>> val spark = SparkSession
>>   .builder()
>>   .appName("Spark SQL basic example")
>>   .config("spark.some.config.option", "some-value")
>>   .getOrCreate()
>> // For implicit conversions like converting RDDs to DataFramesimport 
>> spark.implicits._
>>
>>
>> 2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh 님이
>> 작성:
>>
>>> Hi,
>>>
>>> I have spark streaming that send data and I need to put that data into
>>> MongoDB for test purposes. The easiest way is to create a DF from the
>>> individual list of columns as below
>>>
>>> I loop over individual rows in RDD and perform the following
>>>
>>> case class columns(KEY: String, TICKER: String, TIMEISSUED: String,
>>> PRICE: Float)
>>>
>>>  for(line <- pricesRDD.collect.toArray)
>>>  {
>>> var key = line._2.split(',').view(0).toString
>>>var ticker =  line._2.split(',').view(1).toString
>>>var timeissued = line._2.split(',').view(2).toString
>>>var price = line._2.split(',').view(3).toFloat
>>>val priceToString = line._2.split(',').view(3)
>>>if (price > 90.0)
>>>{
>>>  println ("price > 90.0, saving to MongoDB collection!")
>>> // Save prices to mongoDB collection
>>>* var df = Seq(columns(key, ticker, timeissued, price)).toDF*
>>>
>>> but it fails with message
>>>
>>>  value toDF is not a member of Seq[columns].
>>>
>>> What would be the easiest way of resolving this please?
>>>
>>> thanks
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>


Re: getting error: value toDF is not a member of Seq[columns]

2018-09-05 Thread Jungtaek Lim
You may also find below link useful (though it looks far old), since case
class is the thing which Encoder is available, so there may be another
reason which prevent implicit conversion.

https://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/Spark-Scala-Error-value-toDF-is-not-a-member-of-org-apache/m-p/29994/highlight/true#M973

And which Spark version do you use?


2018년 9월 5일 (수) 오후 5:32, Jungtaek Lim 님이 작성:

> Sorry I guess I pasted another method. the code is...
>
> implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): 
> DatasetHolder[T] = {
>   DatasetHolder(_sqlContext.createDataset(s))
> }
>
>
> 2018년 9월 5일 (수) 오후 5:30, Jungtaek Lim 님이 작성:
>
>> I guess you need to have encoder for the type of result for columns().
>>
>>
>> https://github.com/apache/spark/blob/2119e518d31331e65415e0f817a6f28ff18d2b42/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala#L227-L229
>>
>> implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] 
>> = {
>>   DatasetHolder(_sqlContext.createDataset(rdd))
>> }
>>
>> You can see lots of Encoder implementations in the scala code. If your
>> type doesn't match anything it may not work and you need to provide custom
>> Encoder.
>>
>> -Jungtaek Lim (HeartSaVioR)
>>
>> 2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh 님이
>> 작성:
>>
>>> Thanks
>>>
>>> I already do that as below
>>>
>>> val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
>>>   import sqlContext.implicits._
>>>
>>> but still getting the error!
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim  wrote:
>>>
>>>> You may need to import implicits from your spark session like below:
>>>> (Below code is borrowed from
>>>> https://spark.apache.org/docs/latest/sql-programming-guide.html)
>>>>
>>>> import org.apache.spark.sql.SparkSession
>>>> val spark = SparkSession
>>>>   .builder()
>>>>   .appName("Spark SQL basic example")
>>>>   .config("spark.some.config.option", "some-value")
>>>>   .getOrCreate()
>>>> // For implicit conversions like converting RDDs to DataFramesimport 
>>>> spark.implicits._
>>>>
>>>>
>>>> 2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh 님이
>>>> 작성:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have spark streaming that send data and I need to put that data into
>>>>> MongoDB for test purposes. The easiest way is to create a DF from the
>>>>> individual list of columns as below
>>>>>
>>>>> I loop over individual rows in RDD and perform the following
>>>>>
>>>>> case class columns(KEY: String, TICKER: String, TIMEISSUED:
>>>>> String, PRICE: Float)
>>>>>
>>>>>  for(line <- pricesRDD.collect.toArray)
>>>>>  {
>>>>> var key = line._2.split(',').view(0).toString
>>>>>var ticker =  line._2.split(',').view(1).toString
>>>>>var timeissued = line._2.split(',').view(2).toString
>>>>>var price = line._2.split(',').view(3).toFloat
>>>>>val priceToString = line._2.split(',').view(3)
>>>>>if (price > 90.0)
>>>>>{
>>>>>  println ("price > 90.0, saving to MongoDB collection!")
>>>>> // Save prices to mongoDB collection
>>>>>* var df = Seq(columns(key, ticker, timeissued,
>>>>> price)).toDF*
>>>>>
>>>>> but it fails with message
>>>>>
>>>>>  value toDF is not a member of Seq[columns].
>>>>>
>>>>> What would be the easiest way of resolving this please?
>>>>>
>>>>> thanks
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn * 
>>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>


Re: getting error: value toDF is not a member of Seq[columns]

2018-09-05 Thread Jungtaek Lim
You may need to import implicits from your spark session like below:
(Below code is borrowed from
https://spark.apache.org/docs/latest/sql-programming-guide.html)

import org.apache.spark.sql.SparkSession
val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()
// For implicit conversions like converting RDDs to DataFramesimport
spark.implicits._


2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh 님이 작성:

> Hi,
>
> I have spark streaming that send data and I need to put that data into
> MongoDB for test purposes. The easiest way is to create a DF from the
> individual list of columns as below
>
> I loop over individual rows in RDD and perform the following
>
> case class columns(KEY: String, TICKER: String, TIMEISSUED: String,
> PRICE: Float)
>
>  for(line <- pricesRDD.collect.toArray)
>  {
> var key = line._2.split(',').view(0).toString
>var ticker =  line._2.split(',').view(1).toString
>var timeissued = line._2.split(',').view(2).toString
>var price = line._2.split(',').view(3).toFloat
>val priceToString = line._2.split(',').view(3)
>if (price > 90.0)
>{
>  println ("price > 90.0, saving to MongoDB collection!")
> // Save prices to mongoDB collection
>* var df = Seq(columns(key, ticker, timeissued, price)).toDF*
>
> but it fails with message
>
>  value toDF is not a member of Seq[columns].
>
> What would be the easiest way of resolving this please?
>
> thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: getting error: value toDF is not a member of Seq[columns]

2018-09-05 Thread Jungtaek Lim
Sorry I guess I pasted another method. the code is...

implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]):
DatasetHolder[T] = {
  DatasetHolder(_sqlContext.createDataset(s))
}


2018년 9월 5일 (수) 오후 5:30, Jungtaek Lim 님이 작성:

> I guess you need to have encoder for the type of result for columns().
>
>
> https://github.com/apache/spark/blob/2119e518d31331e65415e0f817a6f28ff18d2b42/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala#L227-L229
>
> implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = 
> {
>   DatasetHolder(_sqlContext.createDataset(rdd))
> }
>
> You can see lots of Encoder implementations in the scala code. If your
> type doesn't match anything it may not work and you need to provide custom
> Encoder.
>
> -Jungtaek Lim (HeartSaVioR)
>
> 2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh 님이 작성:
>
>> Thanks
>>
>> I already do that as below
>>
>> val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
>>   import sqlContext.implicits._
>>
>> but still getting the error!
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim  wrote:
>>
>>> You may need to import implicits from your spark session like below:
>>> (Below code is borrowed from
>>> https://spark.apache.org/docs/latest/sql-programming-guide.html)
>>>
>>> import org.apache.spark.sql.SparkSession
>>> val spark = SparkSession
>>>   .builder()
>>>   .appName("Spark SQL basic example")
>>>   .config("spark.some.config.option", "some-value")
>>>   .getOrCreate()
>>> // For implicit conversions like converting RDDs to DataFramesimport 
>>> spark.implicits._
>>>
>>>
>>> 2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh 님이
>>> 작성:
>>>
>>>> Hi,
>>>>
>>>> I have spark streaming that send data and I need to put that data into
>>>> MongoDB for test purposes. The easiest way is to create a DF from the
>>>> individual list of columns as below
>>>>
>>>> I loop over individual rows in RDD and perform the following
>>>>
>>>> case class columns(KEY: String, TICKER: String, TIMEISSUED:
>>>> String, PRICE: Float)
>>>>
>>>>  for(line <- pricesRDD.collect.toArray)
>>>>  {
>>>> var key = line._2.split(',').view(0).toString
>>>>var ticker =  line._2.split(',').view(1).toString
>>>>var timeissued = line._2.split(',').view(2).toString
>>>>var price = line._2.split(',').view(3).toFloat
>>>>val priceToString = line._2.split(',').view(3)
>>>>if (price > 90.0)
>>>>{
>>>>  println ("price > 90.0, saving to MongoDB collection!")
>>>> // Save prices to mongoDB collection
>>>>* var df = Seq(columns(key, ticker, timeissued,
>>>> price)).toDF*
>>>>
>>>> but it fails with message
>>>>
>>>>  value toDF is not a member of Seq[columns].
>>>>
>>>> What would be the easiest way of resolving this please?
>>>>
>>>> thanks
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>


Re: getting error: value toDF is not a member of Seq[columns]

2018-09-06 Thread Jungtaek Lim
This code works with Spark 2.3.0 via spark-shell.

scala> case class columns(KEY: String, TICKER: String, TIMEISSUED: String,
PRICE: Float)
defined class columns

scala> import spark.implicits._
import spark.implicits._

scala> var df = Seq(columns("key", "ticker", "timeissued", 1.23f)).toDF
18/09/06 18:02:23 WARN ObjectStore: Failed to get database global_temp,
returning NoSuchObjectException
df: org.apache.spark.sql.DataFrame = [KEY: string, TICKER: string ... 2
more fields]

scala> df
res0: org.apache.spark.sql.DataFrame = [KEY: string, TICKER: string ... 2
more fields]

Maybe need to know about actual type of key, ticker, timeissued, price from
your variables.

Jungtaek Lim (HeartSaVioR)

2018년 9월 6일 (목) 오후 5:57, Mich Talebzadeh 님이 작성:

> I am trying to understand why spark cannot convert a simple comma
> separated columns as DF.
>
> I did a test
>
> I took one line of print and stored it as a one liner csv file as below
>
> var allInOne = key+","+ticker+","+timeissued+","+price
> println(allInOne)
>
> cat crap.csv
> 6e84b11d-cb03-44c0-aab6-37e06e06c996,MRW,2018-09-06T09:35:53,275.45
>
> Then after storing it in HDFS, I read that file as below
>
> import org.apache.spark.sql.functions._
> val location="hdfs://rhes75:9000/tmp/crap.csv"
> val df1 = spark.read.option("header", false).csv(location)
> case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE:
> Double)
> val df2 = df1.map(p => columns(p(0).toString,p(1).toString,
> p(2).toString,p(3).toString.toDouble))
> df2.printSchema
>
> This is the result I get
>
> df1: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 2 more
> fields]
> defined class columns
> df2: org.apache.spark.sql.Dataset[columns] = [KEY: string, TICKER: string
> ... 2 more fields]
> root
>  |-- KEY: string (nullable = true)
>  |-- TICKER: string (nullable = true)
>  |-- TIMEISSUED: string (nullable = true)
>  |-- PRICE: double (nullable = false)
>
> So in my case the only difference is that that comma separated line is
> stored in a String as opposed to csv.
>
> How can I achieve this simple transformation?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 6 Sep 2018 at 03:38, Manu Zhang  wrote:
>
>> Have you tried adding Encoder for columns as suggested by Jungtaek Lim ?
>>
>> On Thu, Sep 6, 2018 at 6:24 AM Mich Talebzadeh 
>> wrote:
>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> I can rebuild the comma separated list as follows:
>>>
>>>
>>>case class columns(KEY: String, TICKER: String, TIMEISSUED: String,
>>> PRICE: Float)
>>> val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
>>> import sqlContext.implicits._
>>>
>>>
>>>  for(line <- pricesRDD.collect.toArray)
>>>  {
>>>var key = line._2.split(',').view(0).toString
>>>var ticker =  line._2.split(',').view(1).toString
>>>var timeissued = line._2.split(',').view(2).toString
>>>var price = line._2.split(',').view(3).toFloat
>>>var allInOne = key+","+ticker+","+timeissued+","+price
>>>pri

Re: [Structured Streaming] Custom StateStoreProvider

2018-07-12 Thread Jungtaek Lim
Girish,

I think reading through implementation of HDFSBackedStateStoreProvider as
well as relevant traits should bring the idea to you how to implement
custom one. HDFSBackedStateStoreProvider is not that complicated to read
and understand. You just need to deal with your underlying storage engine.

Tathagata,

Is it planned to turn StateStore and relevant traits into public API? We
have two annotations (InterfaceStability and Experimental) to represent
evolving public API, and state store provider can be plugged-in so sounds
better to make it being public API but marking as evolving.

2018년 7월 11일 (수) 오후 12:40, Tathagata Das 님이 작성:

> Note that this is not public API yet. Hence this is not very documented.
> So use it at your own risk :)
>
> On Tue, Jul 10, 2018 at 11:04 AM, subramgr 
> wrote:
>
>> Hi,
>>
>> This looks very daunting *trait* is there some blog post or some articles
>> which explains on how to implement this *trait*
>>
>> Thanks
>> Girish
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: is it ok to make I/O calls in UDF? other words is it a standard practice ?

2018-04-24 Thread Jungtaek Lim
Another thing you may want to be aware is, if the result is not idempotent,
your query result is also not idempotent. For fault-tolerance there's a
chance for record (row) to be replayed (recomputed).

-Jungtaek Lim (HeartSaVioR)

2018년 4월 24일 (화) 오후 2:07, Jörn Franke <jornfra...@gmail.com>님이 작성:

> What is your use case?
>
> > On 23. Apr 2018, at 23:27, kant kodali <kanth...@gmail.com> wrote:
> >
> > Hi All,
> >
> > Is it ok to make I/O calls in UDF? other words is it a standard practice?
> >
> > Thanks!
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: can we use mapGroupsWithState in raw sql?

2018-04-17 Thread Jungtaek Lim
I think I missed something: self-join is not needed via defining UDAF and
using it from aggregation. Since it requires all fields to be accessed, I
can't find any other approach than wrap fields into struct and unwrap
afterwards. There doesn't look like way to pass multiple fields in UDAF, at
least in RelationalGroupedDataset.

Here's the working code which runs fine in console:


import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import java.sql.Date

class MaxRow extends UserDefinedAggregateFunction {
  // This is the input fields for your aggregate function.
  override def inputSchema: org.apache.spark.sql.types.StructType =
new StructType().add("st", StructType(Seq(
StructField("AMOUNT", IntegerType, true),
StructField("MY_TIMESTAMP", DateType, true))
)
  )

  // This is the internal fields you keep for computing your aggregate.
  override def bufferSchema: StructType =
  new StructType().add("st", StructType(Seq(
StructField("AMOUNT", IntegerType, true),
StructField("MY_TIMESTAMP", DateType, true))
)
  )

  // This is the output type of your aggregatation function.
  override def dataType: DataType =
  new StructType().add("st", StructType(Seq(
StructField("AMOUNT", IntegerType, true),
StructField("MY_TIMESTAMP", DateType, true))
)
  )

  override def deterministic: Boolean = true

  // This is the initial value for your buffer schema.
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
  }

  // This is how to update your buffer schema given an input.
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit =
{
val inputRowStruct = input.getAs[Row](0)
if (buffer.getAs[Row](0) == null || buffer.getAs[Row](0).getInt(0) <
input.getAs[Row](0).getInt(0)) {
  buffer(0) = inputRowStruct
}
  }

  // This is how to merge two objects with the bufferSchema type.
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit
= {
if (buffer1.getAs[Row](0) == null || (buffer2.getAs[Row](0) != null &&
buffer1.getAs[Row](0).getInt(0) < buffer2.getAs[Row](0).getInt(0))) {
  buffer1(0) = buffer2(0)
}
  }

  // This is where you output the final value, given the final value of
your bufferSchema.
  override def evaluate(buffer: Row): Any = {
buffer
  }
}

spark.udf.register("maxrow", new MaxRow)

import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.types._
import spark.implicits._

val socketDF = spark
  .readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", )
  .load()

val schema = StructType(Seq(
  StructField("ID", IntegerType, true),
  StructField("AMOUNT", IntegerType, true),
  StructField("MY_TIMESTAMP", DateType, true)
))

val query = socketDF
  .selectExpr("CAST(value AS STRING) as value")
  .as[String]
  .select(from_json($"value", schema=schema).as("data"))
  .selectExpr("data.ID as ID", "struct(data.AMOUNT, data.MY_TIMESTAMP) as
structure")
  .groupBy($"ID")
  .agg("structure" -> "maxrow")
  .selectExpr("ID", "`maxrow(structure)`.st.AMOUNT",
"`maxrow(structure)`.st.MY_TIMESTAMP")
  .writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("1 seconds"))
  .outputMode(OutputMode.Update())
  .start()


You still want to group records by event-time window and watermark: even
putting all five records together to the socket (by nc), two micro-batches
were handling the records and provide two results.

---
Batch: 0
---
+---+--++
| ID|AMOUNT|MY_TIMESTAMP|
+---+--+----+
|  1|10|  2018-04-01|
|  2|30|  2018-04-01|
+---+--++
---
Batch: 1
---
+---+--++
| ID|AMOUNT|MY_TIMESTAMP|
+---+--++
|  2|40|  2018-04-01|
+---+--++

- Jungtaek Lim (HeartSaVioR)

2018년 4월 18일 (수) 오전 5:56, Jungtaek Lim <kabh...@gmail.com>님이 작성:

> That might be simple if you want to get aggregated values for both amount
> and my_timestamp:
>
> val schema = StructType(Seq(
>   StructField("ID", IntegerType, true),
>   StructField("AMOUNT", IntegerType, true),
>   StructField("MY_TIMESTAMP", DateType, true)
> ))
>
> val query = socketDF
>   .selectExpr(&

Re: can we use mapGroupsWithState in raw sql?

2018-04-17 Thread Jungtaek Lim
That might be simple if you want to get aggregated values for both amount
and my_timestamp:

val schema = StructType(Seq(
  StructField("ID", IntegerType, true),
  StructField("AMOUNT", IntegerType, true),
  StructField("MY_TIMESTAMP", DateType, true)
))

val query = socketDF
  .selectExpr("CAST(value AS STRING) as value")
  .as[String]
  .select(from_json($"value", schema=schema).as("data"))
  .select($"data.ID", $"data.AMOUNT", $"data.MY_TIMESTAMP")
  .groupBy($"ID")
  .agg("AMOUNT" -> "max", "MY_TIMESTAMP" -> "max")

which requires you to set output mode as Update mode or Complete mode.

But I guess you would like to select the max row and use MY_TIMESTAMP from
max row, then I guess you need to do inner self-join, like below:

val query = socketDF
  .selectExpr("CAST(value AS STRING) as value")
  .as[String]
  .select(from_json($"value", schema=schema).as("data"))
  .select($"data.ID", $"data.AMOUNT")
  .groupBy($"ID")
  .agg("AMOUNT" -> "max")

val query2 = socketDF
  .selectExpr("CAST(value AS STRING) as value")
  .as[String]
  .select(from_json($"value", schema=schema).as("data"))
  .select($"data.ID".as("SELF_ID"), $"data.AMOUNT".as("SELF_AMOUNT"),
$"data.MY_TIMESTAMP")

val query3 = query.join(query2, expr("""
   ID = ID AND
   `MAX(AMOUNT)` = SELF_AMOUNT
"""))

which is NOT valid at least for Spark 2.3, because aggregation requires
Update/Complete mode but join requires Append mode.
(Guide page of structured streaming clearly explains such limitation:
"Cannot use streaming aggregation before joins.")

If you can achieve with mapGroupWithState, you may want to stick with that.

Btw, when you deal with streaming, you may want to define logical batch for
all aggregations and joins via defining window and watermark. You wouldn't
want to get different result according to the micro-batch, and then you
always want to deal with event time window.

Thanks,
Jungtaek Lim (HeartSaVioR)

2018년 4월 18일 (수) 오전 3:42, kant kodali <kanth...@gmail.com>님이 작성:

> Hi TD,
>
> Thanks for that. The only reason I ask is I don't see any alternative
> solution to solve the problem below using raw sql.
>
>
> How to select the max row for every group in spark structured streaming
> 2.3.0 without using order by since it requires complete mode or
> mapGroupWithState?
>
> *Input:*
>
> id | amount | my_timestamp
> ---
> 1  |  5 |  2018-04-01T01:00:00.000Z
> 1  | 10 |  2018-04-01T01:10:00.000Z
> 2  | 20 |  2018-04-01T01:20:00.000Z
> 2  | 30 |  2018-04-01T01:25:00.000Z
> 2  | 40 |  2018-04-01T01:30:00.000Z
>
> *Expected Output:*
>
> id | amount | my_timestamp
> ---
> 1  | 10 |  2018-04-01T01:10:00.000Z
> 2  | 40 |  2018-04-01T01:30:00.000Z
>
> Looking for a streaming solution using either raw sql like 
> sparkSession.sql("sql
> query") or similar to raw sql but not something like mapGroupWithState
>
> On Mon, Apr 16, 2018 at 8:32 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Unfortunately no. Honestly it does not make sense as for type-aware
>> operations like map, mapGroups, etc., you have to provide an actual JVM
>> function. That does not fit in with the SQL language structure.
>>
>> On Mon, Apr 16, 2018 at 7:34 PM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> can we use mapGroupsWithState in raw SQL? or is it in the roadmap?
>>>
>>> Thanks!
>>>
>>>
>>>
>>
>


Re: can we use mapGroupsWithState in raw sql?

2018-04-17 Thread Jungtaek Lim
Refined more: I just got rid of wrapping fields into struct, but the type
of result for UDAF is still struct. I need to extract the fields one by
one, but I guess I just haven't find a function which does the thing.

I crafted this code without IDE and ran from spark-shell, so there should
be many spots you can make it shorter or clean up.

import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import java.sql.Date

class MaxRow extends UserDefinedAggregateFunction {
  // This is the input fields for your aggregate function.
  override def inputSchema: org.apache.spark.sql.types.StructType =
StructType(Seq(
StructField("AMOUNT", IntegerType, true),
StructField("MY_TIMESTAMP", DateType, true))
)

  // This is the internal fields you keep for computing your aggregate.
  override def bufferSchema: StructType =
StructType(Seq(
StructField("AMOUNT", IntegerType, true),
StructField("MY_TIMESTAMP", DateType, true))
)

  // This is the output type of your aggregatation function.
  override def dataType: DataType =
  new StructType().add("st", StructType(Seq(
StructField("AMOUNT", IntegerType, true),
StructField("MY_TIMESTAMP", DateType, true))
)
  )

  override def deterministic: Boolean = true

  // This is the initial value for your buffer schema.
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
  }

  // This is how to update your buffer schema given an input.
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit =
{
if (buffer.getAs[Any](0) == null || buffer.getInt(0) < input.getInt(0))
{
  buffer(0) = input(0)
  buffer(1) = input(1)
}
  }

  // This is how to merge two objects with the bufferSchema type.
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit
= {
if (buffer1.getAs[Any](0) == null || (buffer2.getAs[Any](0) != null &&
buffer1.getInt(0) < buffer2.getInt(0))) {
  buffer1(0) = buffer2(0)
  buffer1(1) = buffer2(1)
}
  }

  // This is where you output the final value, given the final value of
your bufferSchema.
  override def evaluate(buffer: Row): Any = {
Row(Row(buffer(0), buffer(1)))
  }
}

val maxrow = new MaxRow
spark.udf.register("maxrow", maxrow)

import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.types._
import spark.implicits._

val socketDF = spark
  .readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", )
  .load()

val schema = StructType(Seq(
  StructField("ID", IntegerType, true),
  StructField("AMOUNT", IntegerType, true),
  StructField("MY_TIMESTAMP", DateType, true)
))

val query = socketDF
  .selectExpr("CAST(value AS STRING) as value")
  .as[String]
  .select(from_json($"value", schema=schema).as("data"))
  .select($"data.ID", $"data.AMOUNT", $"data.MY_TIMESTAMP")
  .groupBy($"ID")
  .agg(maxrow(col("AMOUNT"), col("MY_TIMESTAMP")).as("maxrow"))
  .selectExpr("ID", "maxrow.st.AMOUNT", "maxrow.st.MY_TIMESTAMP")
  .writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("1 seconds"))
  .outputMode(OutputMode.Update())
  .start()

- Jungtaek Lim (HeartSaVioR)


2018년 4월 18일 (수) 오전 7:41, Jungtaek Lim <kabh...@gmail.com>님이 작성:

> I think I missed something: self-join is not needed via defining UDAF and
> using it from aggregation. Since it requires all fields to be accessed, I
> can't find any other approach than wrap fields into struct and unwrap
> afterwards. There doesn't look like way to pass multiple fields in UDAF, at
> least in RelationalGroupedDataset.
>
> Here's the working code which runs fine in console:
>
> 
> import org.apache.spark.sql.expressions.MutableAggregationBuffer
> import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> import java.sql.Date
>
> class MaxRow extends UserDefinedAggregateFunction {
>   // This is the input fields for your aggregate function.
>   override def inputSchema: org.apache.spark.sql.types.StructType =
> new StructType().add("st", StructType(Seq(
> StructField("AMOUNT", IntegerType, true),
> StructField("MY_TIMESTAMP", DateType, true))
> )
>   )
>
>   // This is the internal fields you keep for computing your aggregate.
>   override def bufferSchema: StructTyp

Re: can we use mapGroupsWithState in raw sql?

2018-04-18 Thread Jungtaek Lim
Thanks Michael for providing great solution. Great to remove UDAF and any
needs to provide fields manually.

Btw, your code has compilation error. ')' is missing, and after I fix it,
it complains again with other issue.

:66: error: overloaded method value max with alternatives:
  (columnName: String)org.apache.spark.sql.Column 
  (e: org.apache.spark.sql.Column)org.apache.spark.sql.Column
 cannot be applied to (org.apache.spark.sql.ColumnName,
org.apache.spark.sql.Column)

Could you check your code to see it works with Spark 2.3 (via spark-shell
or whatever)?

Thanks!
Jungtaek Lim (HeartSaVioR)

2018년 4월 19일 (목) 오전 8:34, Michael Armbrust <mich...@databricks.com>님이 작성:

> You can calculate argmax using a struct.
>
> df.groupBy($"id").agg(max($"my_timestamp",
> struct($"*").as("data")).getField("data").select($"data.*")
>
> You could transcode this to SQL, it'll just be complicated nested queries.
>
>
> On Wed, Apr 18, 2018 at 3:40 PM, kant kodali <kanth...@gmail.com> wrote:
>
>> Hi Arun,
>>
>> I want to select the entire row with the max timestamp for each group. I
>> have modified my data set below to avoid any confusion.
>>
>> *Input:*
>>
>> id | amount | my_timestamp
>> ---
>> 1  |  5 |  2018-04-01T01:00:00.000Z
>> 1  | 10 |  2018-04-01T01:10:00.000Z
>> 1  |  6 |  2018-04-01T01:20:00.000Z
>> 2  | 30 |  2018-04-01T01:25:00.000Z
>> 2  | 40 |  2018-04-01T01:30:00.000Z
>>
>> *Expected Output:*
>>
>> id | amount | my_timestamp
>> ---
>> 1  | 10 |  2018-04-01T01:10:00.000Z
>> 2  | 40 |  2018-04-01T01:30:00.000Z
>>
>> Looking for a streaming solution using either raw sql like 
>> sparkSession.sql("sql
>> query") or similar to raw sql but not something like mapGroupWithState
>>
>> On Wed, Apr 18, 2018 at 9:36 AM, Arun Mahadevan <ar...@apache.org> wrote:
>>
>>> Cant the “max” function used here ? Something like..
>>>
>>>
>>> stream.groupBy($"id").max("amount").writeStream.outputMode(“complete”/“update")….
>>>
>>> Unless the “stream” is already a grouped stream, in which case the above
>>> would not work since the support for multiple aggregate operations is not
>>> there yet.
>>>
>>> Thanks,
>>> Arun
>>>
>>> From: kant kodali <kanth...@gmail.com>
>>> Date: Tuesday, April 17, 2018 at 11:41 AM
>>> To: Tathagata Das <tathagata.das1...@gmail.com>
>>> Cc: "user @spark" <user@spark.apache.org>
>>> Subject: Re: can we use mapGroupsWithState in raw sql?
>>>
>>> Hi TD,
>>>
>>> Thanks for that. The only reason I ask is I don't see any alternative
>>> solution to solve the problem below using raw sql.
>>>
>>>
>>> How to select the max row for every group in spark structured streaming
>>> 2.3.0 without using order by since it requires complete mode or
>>> mapGroupWithState?
>>>
>>> *Input:*
>>>
>>> id | amount | my_timestamp
>>> ---
>>> 1  |  5 |  2018-04-01T01:00:00.000Z
>>> 1  | 10 |  2018-04-01T01:10:00.000Z
>>> 2  | 20 |  2018-04-01T01:20:00.000Z
>>> 2  | 30 |  2018-04-01T01:25:00.000Z
>>> 2  | 40 |  2018-04-01T01:30:00.000Z
>>>
>>> *Expected Output:*
>>>
>>> id | amount | my_timestamp
>>> ---
>>> 1  | 10 |  2018-04-01T01:10:00.000Z
>>> 2  | 40 |  2018-04-01T01:30:00.000Z
>>>
>>> Looking for a streaming solution using either raw sql like 
>>> sparkSession.sql("sql
>>> query") or similar to raw sql but not something like mapGroupWithState
>>>
>>> On Mon, Apr 16, 2018 at 8:32 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
>>>> Unfortunately no. Honestly it does not make sense as for type-aware
>>>> operations like map, mapGroups, etc., you have to provide an actual JVM
>>>> function. That does not fit in with the SQL language structure.
>>>>
>>>> On Mon, Apr 16, 2018 at 7:34 PM, kant kodali <kanth...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> can we use mapGroupsWithState in raw SQL? or is it in the roadmap?
>>>>>
>>>>> Thanks!
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: can we use mapGroupsWithState in raw sql?

2018-04-18 Thread Jungtaek Lim
Thanks Arun, I modified a bit to try my best to avoid enumerating fields:

val query = socketDF
  .selectExpr("CAST(value AS STRING) as value")
  .as[String]
  .select(from_json($"value", schema=schema).as("data"))
  .select($"data.*")
  .groupBy($"ID")
  .agg(max(struct($"AMOUNT", $"*")).as("data"))
  .select($"data.*")
  .writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("1 seconds"))
  .outputMode(OutputMode.Update())
  .start()

It still have a minor issue: the column "AMOUNT" is showing twice in result
table, but everything works like a charm.

-Jungtaek Lim (HeartSaVioR)

2018년 4월 19일 (목) 오전 9:43, Arun Mahadevan <ar...@apache.org>님이 작성:

> The below expr might work:
>
> df.groupBy($"id").agg(max(struct($"amount", 
> $"my_timestamp")).as("data")).select($"id", $"data.*")
>
>
> Thanks,
> Arun
>
> From: Jungtaek Lim <kabh...@gmail.com>
> Date: Wednesday, April 18, 2018 at 4:54 PM
> To: Michael Armbrust <mich...@databricks.com>
> Cc: kant kodali <kanth...@gmail.com>, Arun Iyer <ar...@apache.org>,
> Tathagata Das <tathagata.das1...@gmail.com>, "user @spark" <
> user@spark.apache.org>
>
> Subject: Re: can we use mapGroupsWithState in raw sql?
>
> Thanks Michael for providing great solution. Great to remove UDAF and any
> needs to provide fields manually.
>
> Btw, your code has compilation error. ')' is missing, and after I fix it,
> it complains again with other issue.
>
> :66: error: overloaded method value max with alternatives:
>   (columnName: String)org.apache.spark.sql.Column 
>   (e: org.apache.spark.sql.Column)org.apache.spark.sql.Column
>  cannot be applied to (org.apache.spark.sql.ColumnName,
> org.apache.spark.sql.Column)
>
> Could you check your code to see it works with Spark 2.3 (via spark-shell
> or whatever)?
>
> Thanks!
> Jungtaek Lim (HeartSaVioR)
>
> 2018년 4월 19일 (목) 오전 8:34, Michael Armbrust <mich...@databricks.com>님이 작성:
>
>> You can calculate argmax using a struct.
>>
>> df.groupBy($"id").agg(max($"my_timestamp",
>> struct($"*").as("data")).getField("data").select($"data.*")
>>
>> You could transcode this to SQL, it'll just be complicated nested queries.
>>
>>
>> On Wed, Apr 18, 2018 at 3:40 PM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> Hi Arun,
>>>
>>> I want to select the entire row with the max timestamp for each group. I
>>> have modified my data set below to avoid any confusion.
>>>
>>> *Input:*
>>>
>>> id | amount | my_timestamp
>>> ---
>>> 1  |  5 |  2018-04-01T01:00:00.000Z
>>> 1  | 10 |  2018-04-01T01:10:00.000Z
>>> 1  |  6 |  2018-04-01T01:20:00.000Z
>>> 2  | 30 |  2018-04-01T01:25:00.000Z
>>> 2  | 40 |  2018-04-01T01:30:00.000Z
>>>
>>> *Expected Output:*
>>>
>>> id | amount | my_timestamp
>>> ---
>>> 1  | 10 |  2018-04-01T01:10:00.000Z
>>> 2  | 40 |  2018-04-01T01:30:00.000Z
>>>
>>> Looking for a streaming solution using either raw sql like 
>>> sparkSession.sql("sql
>>> query") or similar to raw sql but not something like mapGroupWithState
>>>
>>> On Wed, Apr 18, 2018 at 9:36 AM, Arun Mahadevan <ar...@apache.org>
>>> wrote:
>>>
>>>> Cant the “max” function used here ? Something like..
>>>>
>>>>
>>>> stream.groupBy($"id").max("amount").writeStream.outputMode(“complete”/“update")….
>>>>
>>>> Unless the “stream” is already a grouped stream, in which case the
>>>> above would not work since the support for multiple aggregate operations is
>>>> not there yet.
>>>>
>>>> Thanks,
>>>> Arun
>>>>
>>>> From: kant kodali <kanth...@gmail.com>
>>>> Date: Tuesday, April 17, 2018 at 11:41 AM
>>>> To: Tathagata Das <tathagata.das1...@gmail.com>
>>>> Cc: "user @spark" <user@spark.apache.org>
>>>> Subject: Re: can we use mapGroupsWithState in raw sql?
>>>>
>>>> Hi TD,
>>>>
>>>> Thanks for that. The only reason I ask is I don't see any alternative
>>>> s

Looking for some feedbacks on proposal - native support of session window

2018-09-27 Thread Jungtaek Lim
Hi users, I'm Jungtaek Lim, one of contributors on streaming part.

Recently I proposed some new feature: native support of session window [1].
While it also tackles the edge-case map/flatMapGroupsWithState don't cover
for session window, its major benefit is mostly better usability on session
window, I'd also like to see how end users see the benefit on the proposal.

Please give some feedbacks on either this thread or SPIP discussion thread
[2]. Any feedbacks are appreciated, like show more examples, possible to
tackle more general cases, etc.

Thanks in advance!
Jungtaek Lim (HeartSaVioR)

1.
https://docs.google.com/document/d/1_rMLmUSyGzb62RnP2A3WX6D6uRxox8Q_7WcoI_HrTw4/edit?usp=sharing
2.
https://lists.apache.org/thread.html/8dad35af6dc323abe25dd5cd92def105edc3c509ada3f941fedb48b6@%3Cdev.spark.apache.org%3E


Re: Processing Flexibility Between RDD and Dataframe API

2018-10-29 Thread Jungtaek Lim
Just 2 cents on just one of contributors: while SQL semantic can express
various use cases data scientists encounter, I also agree someone who are
end users who are more familiar with code instead of SQL can feel it is not
flexible.

But counterless efforts have been incorporated into Spark SQL (and
catalyst) so I guess it is clear Spark SQL and Structured Streaming are the
things if your workload fits into them, but on the other hand, if it
doesn't, just keep using RDD. RDD is still the thing underlying Spark SQL,
so I don't expect it is deprecated unless Spark renews the underlying
architecture.

-Jungtaek Lim

2018년 10월 29일 (월) 오전 12:06, Adrienne Kole 님이 작성:

> Thanks for bringing this issue to the mailing list.
> As an addition, I would also ask the same questions about  DStreams and
> Structured Streaming APIs.
> Structured Streaming is high level and it makes difficult to express all
> business logic in it, although Databricks are pushing it and recommending
> for usage.
> Moreover, there are some works are going on continuous streaming.
> So, what is the Spark's future vision, support all or concentrate on one,
> as all those paradigms have separate processing semantics?
>
>
> Cheers,
> Adrienne
>
> On Sun, Oct 28, 2018 at 3:50 PM Soheil Pourbafrani 
> wrote:
>
>> Hi,
>> There are some functions like map, flatMap, reduce and ..., that
>> construct the base data processing operation in big data (and Apache
>> Spark). But Spark, in new versions, introduces the high-level Dataframe API
>> and recommend using it. This is while there are no such functions in
>> Dataframe API and it just has many built-in functions and the UDF. It's
>> very inflexible (at least to me) and I at many points should convert
>> Dataframes to RDD and vice-versa. My question is:
>> Is RDD going to be outdated and if so, what is the correct road-map to do
>> processing using Apache Spark, while Dataframe doesn't support functions
>> like Map and reduce? How UDF functions process the data, they will apply to
>> every row, like map functions? Are converting Dataframe to RDD comes with
>> many costs?
>>
>


Re: Equivalent of emptyDataFrame in StructuredStreaming

2018-11-05 Thread Jungtaek Lim
Could you explain what you're trying to do? It should have no batch for no
data in stream, so it will end up to no-op even it is possible.

- Jungtaek Lim (HeartSaVioR)

2018년 11월 6일 (화) 오전 8:29, Arun Manivannan 님이 작성:

> Hi,
>
> I would like to create a "zero" value for a Structured Streaming Dataframe
> and unfortunately, I couldn't find any leads.  With Spark batch, I can do a
> "emptyDataFrame" or "createDataFrame" with "emptyRDD" but with
> StructuredStreaming, I am lost.
>
> If I use the "emptyDataFrame" as the zero value, I wouldn't be able to
> join them with any other DataFrames in the program because Spark doesn't
> allow you to mix batch and stream data frames. (isStreaming=false for the
> Batch ones).
>
> Any clue is greatly appreciated. Here are the alternatives that I have at
> the moment.
>
> *1. Reading from an empty file *
> *Disadvantages : poll is expensive because it involves IO and it's error
> prone in the sense that someone might accidentally update the file.*
>
> val emptyErrorStream = (spark: SparkSession) => {
>   spark
> .readStream
> .format("csv")
> .schema(DataErrorSchema)
> 
> .load("/Users/arunma/IdeaProjects/OSS/SparkDatalakeKitchenSink/src/test/resources/dummy1.txt")
> .as[DataError]
> }
>
> *2. Use MemoryStream*
>
> *Disadvantages: MemoryStream itself is not recommended for production use 
> because of the ability to mutate it but I am converting it to DS immediately. 
> So, I am leaning towards this at the moment. *
>
>
> val emptyErrorStream = (spark:SparkSession) => {
>   implicit val sqlC = spark.sqlContext
>   MemoryStream[DataError].toDS()
> }
>
> Cheers,
> Arun
>


Re: Watermarking without aggregation with Structured Streaming

2018-09-30 Thread Jungtaek Lim
The purpose of watermark is to set a limitation on handling records due to
state going infinity. In other cases (non-stateful operations), it is
pretty normal to handle all of records even they're pretty late.

Btw, there was some comments regarding this: while Spark delegates to
filter out late records in stateful operations for now, some of us
(including me) think filtering out late records in earlier phase (source,
or just after source) makes more sense. It just didn't come out as action,
but I think it is still valid.

https://github.com/apache/spark/pull/21617#issuecomment-400119049

If we move the phase of filtering out late records, what you would like to
do may become the default behavior. This also means the output may be also
changed for queries which use non-stateful operations, so it is not a
trivial change and may require consensus like SPIP process.

Thanks,
Jungtaek Lim (HeartSaVioR)

2018년 9월 30일 (일) 오후 5:19, chandan prakash 님이 작성:

> Interesting question.
> I do not think without any aggregation operation/groupBy , watermark is
> supported currently .
>
> Reason:
> Watermark in Structured Streaming is used for limiting the size of state
> needed to keep intermediate information in-memory.
> And state only comes in picture in case of stateful processing.
> Also in the code, it seems that  filtering out records on basis of
> watermark happen only in case of stateful operators
> (statefulOperators.scala)
> Have not tried running code though and would like to know if someone can
> shed more light on this.
>
> Regards,
> Chandan
>
>
> On Sat, Sep 22, 2018 at 7:43 PM peay  wrote:
>
>> Hello,
>>
>> I am trying to use watermarking without aggregation, to filter out
>> records that are just too late, instead of appending them to the output. My
>> understanding is that aggregation is required for `withWatermark` to have
>> any effect. Is that correct?
>>
>> I am looking for something along the lines of
>>
>> ```
>> df.withWatermark("ts", ...).filter(F.col("ts") > ```
>>
>> Is there any way to get the watermark value to achieve that?
>>
>> Thanks!
>>
>
>
> --
> Chandan Prakash
>
>


Re: Why does join use rows that were sent after watermark of 20 seconds?

2018-12-10 Thread Jungtaek Lim
Please refer the structured streaming guide doc which is very clear of
representing when the query will have unbounded state.

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#inner-joins-with-optional-watermarking

Quoting the doc:



In other words, you will have to do the following additional steps in the
join.

   1.

   Define watermark delays on both inputs such that the engine knows how
   delayed the input can be (similar to streaming aggregations)
   2.

   Define a constraint on event-time across the two inputs such that the
   engine can figure out when old rows of one input is not going to be
   required (i.e. will not satisfy the time constraint) for matches with the
   other input. This constraint can be defined in one of the two ways.
   1.

  Time range join conditions (e.g. ...JOIN ON leftTime BETWEEN
  rightTime AND rightTime + INTERVAL 1 HOUR),
  2.

  Join on event-time windows (e.g. ...JOIN ON leftTimeWindow =
  rightTimeWindow).



So yes, join condition should directly deal with timestamp column,
otherwise state will grow infinitely.

Thanks,
Jungtaek Lim (HeartSaVioR)

2018년 12월 11일 (화) 오후 2:52, Abhijeet Kumar 님이
작성:

> You mean to say that Spark will store all the data in memory forever :)
>
>
> On 10-Dec-2018, at 6:16 PM, Sandeep Katta <
> sandeep0102.opensou...@gmail.com> wrote:
>
> Hi Abhijeet,
>
> You are using inner join with unbounded state which means every data in
> stream ll match with  other stream infinitely,
>   If you want the intended behaviour you should add time stamp conditions
> or window operator in join condition
>
>
>
> On Mon, 10 Dec 2018 at 5:23 PM, Abhijeet Kumar <
> abhijeet.ku...@sentienz.com> wrote:
>
>> Hello,
>>
>> I’m using watermark to join two streams as you can see below:
>>
>> val order_wm = order_details.withWatermark("tstamp_trans", "20 seconds")val 
>> invoice_wm = invoice_details.withWatermark("tstamp_trans", "20 seconds")val 
>> join_df = order_wm
>>   .join(invoice_wm, order_wm.col("s_order_id") === 
>> invoice_wm.col("order_id"))
>>
>> My understanding with the above code, it will keep each of the stream for
>> 20 secs. After it comes but, when I’m giving one stream now and the another
>> after 20secs then also both are getting joined. It seems like even after
>> watermark got finished Spark is holding the data in memory. I even tried
>> after 45 seconds and that was getting joined too.
>> I’m sending streams from two Kafka queues and tstamp_trans I’m creating
>> with current timestamp values.
>>
>> This is creating confusion in my mind regarding watermark.
>>
>> Thank you,
>> Abhijeet Kumar
>>
>
>


Re: [Spark Structued Streaming]: Read kafka offset from a timestamp

2018-11-19 Thread Jungtaek Lim
It really depends on whether we use it only for starting query (instead of
restoring from checkpoint) or we would want to restore the previous batch
from specific time (with restoring state as well).

For former would make sense and I'll try to see whether I can address it.
For latter that doesn't look like possible because we normally want to go
back to at least couple of hours which hundreds of batches could have been
processed and no information for such batch is left. Looks like you're
talking about the latter, but if we agree the former is also enough helpful
I can take a look at it.

Btw, it might also help on batch query as well, actually sounds more
helpful on batch query.

-Jungtaek Lim (HeartSaVioR)

2018년 11월 18일 (일) 오전 9:53, puneetloya 님이 작성:

> I would like to request a feature for reading data from Kafka Source based
> on
> a timestamp. So that if the application needs to process data from a
> certain
> time, it should be able to do it. I do agree, that there is checkpoint
> which
> gives us a continuation of stream process but what if I want to rewind the
> checkpoints.
> According to Spark experts, its not advised to edit checkpoints and finding
> the right offsets to replay Spark is tricky but replaying from a certain
> timestamp is a lot easier, atleast with a decent monitoring system.( the
> time from where things started to fall apart like a buggy push or a bad
> setting change)
>
> The Kafka consumer APIs support this method OffsetForTimes which can easily
> give the right offsets,
>
>
> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/Consumer.html#offsetsForTimes
>
> Similar to the StartingOffsets and EndingOffsets, it can support
> startTimestamp and endTimeStamp
>
> In a SAAS environment, when continuous data keeps flowing, these small
> tweaks can help us repair our systems. Spark Structured Streaming is
> already
> great but features like these will keep things under control in a live
> production processing environment.
>
> Cheers,
> Puneet
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Streaming

2018-11-26 Thread Jungtaek Lim
You may need to put efforts on triage how much time is spent on each part.
Without such information you are only able to get general tips and tricks.
Please check SQL tab and see DAG graph as well as details (logical plan,
physical plan) to see whether you're happy about these plans.

General tip on quick look of query: avoid using withColumn repeatedly and
try to put them in one select statement. If I'm not mistaken, it is known
as a bit costly since each call would produce a new Dataset. Defining
schema and using "from_json" will eliminate all the call of withColumn"s"
and extra calls of "get_json_object".

- Jungtaek Lim (HeartSaVioR)

2018년 11월 27일 (화) 오후 2:44, Siva Samraj 님이 작성:

> Hello All,
>
> I am using Spark 2.3 version and i am trying to write Spark Streaming
> Join. It is a basic join and it is taking more time to join the stream
> data. I am not sure any configuration we need to set on Spark.
>
> Code:
> *
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.streaming.Trigger
> import org.apache.spark.sql.types.TimestampType
>
> object OrderSalesJoin {
>   def main(args: Array[String]): Unit = {
>
> setEnvironmentVariables(args(0))
>
> val order_topic = args(1)
> val invoice_topic = args(2)
> val dest_topic_name = args(3)
>
> val spark =
> SparkSession.builder().appName("SalesStreamingJoin").getOrCreate()
>
> val checkpoint_path = HDFS_HOST + "/checkpoints/" + dest_topic_name
>
> import spark.implicits._
>
>
> val order_df = spark
>   .readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", KAFKA_BROKERS)
>   .option("subscribe", order_topic)
>   .option("startingOffsets", "latest")
>   .option("failOnDataLoss", "false")
>   .option("kafka.replica.fetch.max.bytes", "15728640")
>   .load()
>
>
> val invoice_df = spark
>   .readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", KAFKA_BROKERS)
>   .option("subscribe", invoice_topic)
>   .option("startingOffsets", "latest")
>   .option("failOnDataLoss", "false")
>   .option("kafka.replica.fetch.max.bytes", "15728640")
>   .load()
>
>
> val order_details = order_df
>   .withColumn("s_order_id", get_json_object($"value".cast("String"),
> "$.order_id"))
>   .withColumn("s_customer_id",
> get_json_object($"value".cast("String"), "$.customer_id"))
>   .withColumn("s_promotion_id",
> get_json_object($"value".cast("String"), "$.promotion_id"))
>   .withColumn("s_store_id", get_json_object($"value".cast("String"),
> "$.store_id"))
>   .withColumn("s_product_id", get_json_object($"value".cast("String"),
> "$.product_id"))
>   .withColumn("s_warehouse_id",
> get_json_object($"value".cast("String"), "$.warehouse_id"))
>   .withColumn("unit_cost", get_json_object($"value".cast("String"),
> "$.unit_cost"))
>   .withColumn("total_cost", get_json_object($"value".cast("String"),
> "$.total_cost"))
>   .withColumn("units_sold", get_json_object($"value".cast("String"),
> "$.units_sold"))
>   .withColumn("promotion_cost",
> get_json_object($"value".cast("String"), "$.promotion_cost"))
>   .withColumn("date_of_order",
> get_json_object($"value".cast("String"), "$.date_of_order"))
>   .withColumn("tstamp_trans", current_timestamp())
>   .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans",
> "MMddHHmmss").cast(TimestampType))
>   .select($"s_customer_id", $"s_order_id", $"s_promotion_id",
> $"s_store_id", $"s_product_id",
> $"s_warehouse_id", $"unit_cost".cast("integer") as "unit_cost",
> $"total_cost".cast("integer") as "total_cost",
> $"promotion_cost".cast("integer") as "promotion_cost",
> $"date_of_order", $"tstamp_trans", $"TIMESTAMP",

Re: Why does Apache Spark Master shutdown when Zookeeper expires the session

2019-03-06 Thread Jungtaek Lim
I guess this is known issue and being tracked by SPARK-15544
[1] SPARK-23530 [2] (seems duplicated)

I guess that's the simplest implementation of H/A (since we don't bother
with current state in master) when background process like supervisord
restarts the process when process is no longer running, but if there's no
background process being setup, it may lead to become all master processes
being shut down eventually.

IMHO the safer approach is storing all information to ZK (source of truth)
and only leader master can read and write on that. Other follower masters
just wait and load information when one of them becomes master. That should
require pretty much changes though.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)

1. https://issues.apache.org/jira/browse/SPARK-15544
2. https://issues.apache.org/jira/browse/SPARK-23530

2019년 3월 5일 (화) 오후 10:02, lokeshkumar 님이 작성:

> As I understand, Apache Spark Master can be run in high availability mode
> using Zookeeper. That is, multiple Spark masters can run in Leader/Follower
> mode and these modes are registered with Zookeeper.
>
> In our scenario Zookeeper is expiring the Spark Master's session which is
> acting as Leader. So the Spark MAster which is leader receives this
> notification and shutsdown deliberately.
>
> Can someone explain why this decision os shutting down rather than retrying
> has been taken?
>
> And why does Kafka retry connecting to Zookeeper when it receives the same
> Expiry notification?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: use rocksdb for spark structured streaming (SSS)

2019-03-10 Thread Jungtaek Lim
The query makes state growing infinitely. Could you consider watermark
apply to "receivedAt" to let unnecessary part of state cleared out? Other
than watermark you could implement TTL based eviction via
flatMapGroupsWithState, though you'll need to implement your custom
"dropDuplicate".

2019년 3월 11일 (월) 오전 5:59, Georg Heiler 님이 작성:

> Use https://github.com/chermenin/spark-states instead
>
> Am So., 10. März 2019 um 20:51 Uhr schrieb Arun Mahadevan <
> ar...@apache.org>:
>
>>
>> Read the link carefully,
>>
>> This solution is available (*only*) in Databricks Runtime.
>>
>> You can enable RockDB-based state management by setting the following
>> configuration in the SparkSession before starting the streaming query.
>>
>> spark.conf.set(
>>   "spark.sql.streaming.stateStore.providerClass",
>>   "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
>>
>>
>> On Sun, 10 Mar 2019 at 11:54, Lian Jiang  wrote:
>>
>>> Hi,
>>>
>>> I have a very simple SSS pipeline which does:
>>>
>>> val query = df
>>>   .dropDuplicates(Array("Id", "receivedAt"))
>>>   .withColumn(timePartitionCol, timestamp_udfnc(col("receivedAt")))
>>>   .writeStream
>>>   .format("parquet")
>>>   .partitionBy("availabilityDomain", timePartitionCol)
>>>   .trigger(Trigger.ProcessingTime(5, TimeUnit.MINUTES))
>>>   .option("path", "/data")
>>>   .option("checkpointLocation", "/data_checkpoint")
>>>   .start()
>>>
>>> After ingesting 2T records, the state under checkpoint folder on HDFS 
>>> (replicator factor 2) grows to 2T bytes.
>>> My cluster has only 2T bytes which means the cluster can barely handle 
>>> further data growth.
>>>
>>> Online spark documents 
>>> (https://docs.databricks.com/spark/latest/structured-streaming/production.html)
>>> says using rocksdb help SSS job reduce JVM memory overhead. But I cannot 
>>> find any document how
>>>
>>> to setup rocksdb for SSS. Spark class CheckpointReader seems to only handle 
>>> HDFS.
>>>
>>> Any suggestions? Thanks!
>>>
>>>
>>>
>>>


Re: Structured Streaming & Query Planning

2019-03-18 Thread Jungtaek Lim
Almost everything is coupled with logical plan right now, including updated
range for source in new batch, updated watermark for stateful operations,
random seed in each batch. Please refer below codes:

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala

We might try out replacing these things in physical plan so that logical
plan doesn't need to be evaluated, but not sure it's feasible.

Thanks,
Jungtaek Lim (HeartSaVioR)

2019년 3월 18일 (월) 오후 4:03, Paolo Platter 님이 작성:

> I can understand that if you involve columns with variable distribution in
> join operations, it may change your execution plan, but most of the time
> this is not going to happen, in streaming the most used operations are: map
> filter, grouping and stateful operations and in all these cases I can't how
> a dynamic query planning could help.
>
> It could be useful to have a parameter to force a streaming query to
> calculate the query plan just once.
>
> Paolo
>
>
>
> Ottieni Outlook per Android <https://aka.ms/ghei36>
>
> --
> *From:* Alessandro Solimando 
> *Sent:* Thursday, March 14, 2019 6:59:50 PM
> *To:* Paolo Platter
> *Cc:* user@spark.apache.org
> *Subject:* Re: Structured Streaming & Query Planning
>
> Hello Paolo,
> generally speaking, query planning is mostly based on statistics and
> distributions of data values for the involved columns, which might
> significantly change over time in a streaming context, so for me it makes a
> lot of sense that it is run at every schedule, even though I understand
> your concern.
>
> For the second question I don't know how to (or if you even can) cache the
> computed query plan.
>
> If possible, would you mind sharing your findings afterwards? (query
> planning on streaming it's a very interesting and not yet enough explored
> topic IMO)
>
> Best regards,
> Alessandro
>
> On Thu, 14 Mar 2019 at 16:51, Paolo Platter 
> wrote:
>
>> Hi All,
>>
>>
>>
>> I would like to understand why in a streaming query ( that should not be
>> able to change its behaviour along iterations ) there is a
>> queryPlanning-Duration effort ( in my case is 33% of trigger interval ) at
>> every schedule. I don’t uderstand  why this is needed and if it is possible
>> to disable or cache it.
>>
>>
>>
>> Thanks
>>
>>
>>
>>
>>
>> [image: cid:image001.jpg@01D41D15.E01B6F00]
>>
>> *Paolo Platter*
>>
>> *CTO*
>>
>> E-mail:paolo.plat...@agilelab.it
>>
>> Web Site:   www.agilelab.it
>>
>>
>>
>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Not authorized to access group: spark-kafka-source-060f3ceb-09f4-4e28-8210-3ef8a845fc92--2038748645-driver-2

2019-02-13 Thread Jungtaek Lim
Adding to Gabor's answer, in Spark 3.0 end users can even provide full of
group id (Please refer SPARK-26350 [1]), but you may feel more convenient
to use prefix of group id Gabor guided (Please refer SPARK-26121 [2]) to
provide permission to broader ranges of groups.

1. https://issues.apache.org/jira/browse/SPARK-26350
2. https://issues.apache.org/jira/browse/SPARK-26121

Thanks,
Jungtaek Lim (HeartSaVioR)

2019년 2월 13일 (수) 오후 6:36, Gabor Somogyi 님이 작성:

> Hi Thomas,
>
> The issue occurs when the user does not have the READ permission on the
> consumer groups.
>
> In DStreams group ID is configured in application, for example:
> https://github.com/gaborgsomogyi/spark-dstream-secure-kafka-app/blob/161bf02eb3677aac604d63499041f72231d0e371/src/main/scala/com/cloudera/spark/examples/DirectKafkaWordCount.scala#L59
>
> In Strucuted Streaming the group ID is generated by Spark internally.
>
> Either one has to give access to "spark-kafka-source-*" group or in Spark
> 3.0 this prefix can be configured with "groupidprefix" parameter.
>
> BR,
> G
>
>
> On Wed, Feb 13, 2019 at 3:58 AM Allu Thomas
>  wrote:
>
>> Hi There,
>>
>> My use case is to read a simple json message from Kafka queue using Spark
>> Structured Streaming. But I’m getting the following error message when I
>> run  my Kafka consumer. I don’t get this error when using Spark direct
>> stream. The issue is happening only with structured streaming. Any help
>> would be greatly appreciated.
>>
>>
>> Exception in thread "main"
>> org.apache.spark.sql.streaming.StreamingQueryException: Not authorized to
>> access group:
>> spark-kafka-source-060f3ceb-09f4-4e28-8210-3ef8a845fc92--2038748645-driver-2
>> === Streaming Query ===
>> Identifier: [id = 6ab10eab-4f71-435c-8705-820e66cee47e, runId =
>> 48430367-9e14-450b-b8e0-27199b536403]
>> Current Committed Offsets: {}
>> Current Available Offsets: {}
>>
>>
>> Current State: ACTIVE
>> Thread State: RUNNABLE
>>
>>
>> Logical Plan:
>> KafkaSource[Subscribe[cla-claim-raw]]
>> at org.apache.spark.sql.execution.streaming.StreamExecution.org
>> $apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
>> at
>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
>> Caused by: org.apache.kafka.common.errors.GroupAuthorizationException:
>> Not authorized to access group:
>> spark-kafka-source-060f3ceb-09f4-4e28-8210-3ef8a845fc92--2038748645-driver-2
>>
>> Thanks,
>> Thomas Thomas
>>
>


Re: java.lang.IllegalArgumentException: Unsupported class file major version 55

2019-02-07 Thread Jungtaek Lim
ASM 6 doesn't support Java 11. In master branch (for Spark 3.0) there's
dependency upgrade on ASM 7 and also some efforts (if my understanding is
right) to support Java 11, so you may need to use lower version of JDK (8
safest) for Spark 2.4.0, and try out master branch for preparing Java 11.

Thanks,
Jungtaek Lim (HeartSaVioR)

2019년 2월 7일 (목) 오후 9:18, Gabor Somogyi 님이 작성:

> Hi Hande,
>
> "Unsupported class file major version 55" means java incompatibility.
> This error means you're trying to load a Java "class" file that was
> compiled with a newer version of Java than you have installed.
> For example, your .class file could have been compiled for JDK 8, and
> you're trying to run it with JDK 7.
> Are you sure 11 is the only JDK which is the default?
>
> Small number of peoples playing with JDK 11 but not heavily tested and
> used.
> Spark may or may not work but not suggested for production in general.
>
> BR,
> G
>
>
> On Thu, Feb 7, 2019 at 12:53 PM Hande, Ranjit Dilip (Ranjit) <
> ha...@avaya.com> wrote:
>
>> Hi,
>>
>> I am developing one java process which will consume data from Kafka using
>> Apache Spark Streaming.
>> For this I am using following:
>>
>> Java:
>> openjdk version "11.0.1" 2018-10-16 LTS
>> OpenJDK Runtime Environment Zulu11.2+3 (build 11.0.1+13-LTS) OpenJDK
>> 64-Bit Server VM Zulu11.2+3 (build 11.0.1+13-LTS, mixed mode)
>>
>> Maven: (Spark Streaming)
>> 
>> org.apache.spark
>> spark-streaming-kafka-0-10_2.11
>> 2.4.0
>> 
>> 
>> org.apache.spark
>> spark-streaming_2.11
>> 2.4.0
>> 
>>
>> I am able to compile project successfully but when I try to run I get
>> following error:
>>
>> {"@timestamp":"2019-02-07T11:54:30.624+05:30","@version":"1","message":"Application
>> run
>> failed","logger_name":"org.springframework.boot.SpringApplication","thread_name":"main","level":"ERROR","level_value":4,"stack_trace":"java.lang.IllegalStateException:
>> Failed to execute CommandLineRunner at
>> org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:816)
>> at
>> org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:797)
>> at
>> org.springframework.boot.SpringApplication.run(SpringApplication.java:324)
>> at
>> com.avaya.measures.AgentMeasures.AgentMeasuresApplication.main(AgentMeasuresApplication.java:41)
>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method) at
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.base/java.lang.reflect.Method.invoke(Method.java:566) at
>> org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:48)
>> at org.springframework.boot.loader.Launcher.launch(Launcher.java:87) at
>>
>> org.springframework.boot.loader.Launcher.launch(Launcher.java:50) at
>> org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:51)\r\nCaused
>> by: java.lang.IllegalArgumentException: Unsupported class file major
>> version 55 at
>>
>>  org.apache.xbean.asm6.ClassReader.(ClassReader.java:166) at
>> org.apache.xbean.asm6.ClassReader.(ClassReader.java:148) at
>> org.apache.xbean.asm6.ClassReader.(ClassReader.java:136) at
>> org.apache.xbean.asm6.ClassReader.(ClassReader.java:237) at
>> org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:49)
>> at
>> org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:517)
>> at
>> org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:500)
>> at
>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>> at
>> scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
>> at
>> scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
>> at
>> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
>> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at
>> scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:134) at
>> scala.collection.TraversableLike$WithFilter.foreach(Traver

Offline state manipulation tool for structured streaming query

2019-04-13 Thread Jungtaek Lim
Hi Spark users, especially Structured Streaming users who are dealing with
stateful queries,

I'm pleased to introduce Spark State Tools, which enables offline state
manipulations for structured streaming query.

Basically the tool provides state as batch source and output so that you
can read state and transform, and even write back to state. With the full
features of batch query Spark SQL provides, you can achieve what you've
just imagined with your state, including rescaling state (repartition) and
schema evolution.

Summarized features are below:

- Show state information which you'll need to provide to enjoy features
  - state operator information, state schema
- Create savepoint from existing checkpoint of Structured Streaming query
- Read state as batch source of Spark SQL
- Write DataFrame to state as batch sink of Spark SQL
- Migrate state format from old to new
  - migrating Streaming Aggregation from ver 1 to 2
  - migrating FlatMapGroupsWithState from ver 1 to 2

And here's Github repository of this tool.
https://github.com/HeartSaVioR/spark-state-tools

Artifacts are also published to Maven central so you can just pull the
artifact into your app.

I'd be happy to hear new ideas of improvements, and much appreciated for
contributions!

Enjoy!

Thanks,
Jungtaek Lim (HeartSaVioR)


Re: Spark structured streaming leftOuter join not working as I expect

2019-06-05 Thread Jungtaek Lim
I would suspect that rows are never evicted in state in second join. To
determine whether the row is NOT matched to other side, Spark should check
whether the row is ever matched before evicted. You need to set watermark
either B_LAST_MOD or C_LAST_MOD.

If you already did but not exposed to here, please paste all codes
(assuming you've already redacted) to gist or attach zipped file for
project.

Btw, there's known "correctness" issue on streaming-streaming left/right
outer join. Please refer SPARK-26154 [1] for details. That's not a same
case, but should be good to know once you're dealing with
streaming-streaming join.

Thanks,
Jungtaek Lim (HeartSaVioR)

1. https://issues.apache.org/jira/browse/SPARK-26154

On Tue, Jun 4, 2019 at 9:31 PM Joe Ammann  wrote:

> Hi all
>
> sorry, tl;dr
>
> I'm on my first Python Spark structured streaming app, in the end joining
> messages from ~10 different Kafka topics. I've recently upgraded to Spark
> 2.4.3, which has resolved all my issues with the time handling (watermarks,
> join windows) I had before with Spark 2.3.2.
>
> My current problem happens during a leftOuter join, where messages from 3
> topics are joined, the results are then aggregated with a groupBy and
> finally put onto a result Kafka topic. On the 3 input topics involved, all
> messages have ID and LAST_MOD fields. I use the ID for joining, and the
> LAST_MOD as event timestamp on all incoming streams. Since the fields on
> the incoming messages are all named the same (ID and LAST_MOD), I rename
> them on all incoming streams with
>
>  aDf = aStream.selectExpr("*", "ID as A_ID", "LAST_MOD as
> A_LAST_MOD").drop(*["ID", "LAST_MOD"])
>
> For those data frames, I then take the watermark with the A/B/C_LAST_MOD
> as event time, before joining. I know that the LAST_MOD timestamps are
> equal on the messages that I want to join together.
>
> The first join is an inner join, where a field on stream A links with the
> ID of stream B. So I have
>
>  aDf
> .join(bDf, expr("B_FK = B_ID"))   # B_FK is the field in stream A
> .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds"))
> .agg(
> collect_list(struct("*")).alias("RESULTS"),
> count("A_ID").alias("NUM_RESULTS"),
> # just add a timestamp to watermark on, they are all the
> min("A_LAST_MOD").alias("RESULT_LAST_MOD")
> )
> .withWatermark("RESULT_LAST_MOD", "30 seconds")
> )
>
> This works perfectly and generates (on my current data set) some 10'000
> records. This is the expected result.
>
> When I add the leftOuter join of the third topic as follows
>
>  aDf
> .join(bDf, expr("B_FK = B_ID"))   # B_FK is the field in stream A
> # here the additional left join
> -join(cDF, expr("C_FK = C_ID and B_LAST_MOD = C_LAST_MOD",
> "leftOuter)) # C_FK is the field in stream B
> .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds"))
> .agg(
> collect_list(struct("*")).alias("RESULTS"),
> count("A_ID").alias("NUM_RESULTS"),
> # just add a timestamp to watermark on, they are all the
> min("A_LAST_MOD").alias("RESULT_LAST_MOD")
> )
> .withWatermark("RESULT_LAST_MOD", "30 seconds")
> )
>
> then what I would expect is that I get the same number of output records
> (~10'000), and some of them have the additional fields from the C stream.
>
> But what happens is that my output is reduced to ~1'500 records, exactly
> those which have a successful join on records on topic C. The other are not
> shown on the output.
>
> I already tried
>
>* make sure that the optional FK on topic B is never null, by using an
> NVL2(C_FK, C_FK, '')
>* widen the time window join on the leftOuter to "B_LAST_MOD <
> C_LAST_LAST_MOD - interval 5 seconds ..."
>* use various combinations of joinWindows and watermarkLateThreshold
>
> The result is always the same: I'm "losing" the ~8'500 records for which
> the optional join FK is NULL on topic B.
>
> Did I totally misunderstand the concept of stream-stream left outer join?
> Or what could be wrong
>
> --
> CU, Joe
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Name : Jungtaek Lim
Blog : http://medium.com/@heartsavior
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior


Re: Spark structured streaming leftOuter join not working as I expect

2019-06-05 Thread Jungtaek Lim
Nice to hear you're investigating the issue deeply.

Btw, if attaching code is not easy, maybe you could share logical/physical
plan on any batch: "detail" in SQL tab would show up the plan as string.
Plans from sequential batches would be much helpful - and streaming query
status in these batch (especially watermark) should be helpful too.


On Wed, Jun 5, 2019 at 11:57 PM Joe Ammann  wrote:

> Hi Jungtaek
>
> Thanks for your response!
>
> I actually have set watermarks on all the streams A/B/C with the
> respective event time
> column A/B/C_LAST_MOD. So I think this should not be the reason.
>
> Of course, the event time on the C stream (the "optional one") progresses
> much slower
> than on the other 2. I try to adjust for this by setting
>
>spark.sql.streaming.multipleWatermarkPolicy=max
>
> and judging from the microbatch results, this also works. The global
> watermark seems
> to progress as expected with the event time from A/B stream.
>
> I will try to put together an isolated test case to reproduce the issue,
> that whole code
> is embedded in a larger app and hence not easily to rip out.
>
> I did some more testing, and for now these are my observations
>  - inner join followed by aggregation works as expected
>  - inner join with 1 left outer (and no aggregation) works as expected
>  - inner join with 2 left outer only produces results where both outer
> have a match
>  - inner join with 1 left outer followed by aggregation only produces the
> messages with a match
>
> Of course, all are stream-stream joins
>
> CU, Joe
>
> On Wednesday, June 5, 2019 09:17 CEST, Jungtaek Lim 
> wrote:
> > I would suspect that rows are never evicted in state in second join. To
> > determine whether the row is NOT matched to other side, Spark should
> check
> > whether the row is ever matched before evicted. You need to set watermark
> > either B_LAST_MOD or C_LAST_MOD.
> >
> > If you already did but not exposed to here, please paste all codes
> > (assuming you've already redacted) to gist or attach zipped file for
> > project.
> >
> > Btw, there's known "correctness" issue on streaming-streaming left/right
> > outer join. Please refer SPARK-26154 [1] for details. That's not a same
> > case, but should be good to know once you're dealing with
> > streaming-streaming join.
> >
> > Thanks,
> > Jungtaek Lim (HeartSaVioR)
> >
> > 1. https://issues.apache.org/jira/browse/SPARK-26154
> >
> > On Tue, Jun 4, 2019 at 9:31 PM Joe Ammann  wrote:
> >
> > > Hi all
> > >
> > > sorry, tl;dr
> > >
> > > I'm on my first Python Spark structured streaming app, in the end
> joining
> > > messages from ~10 different Kafka topics. I've recently upgraded to
> Spark
> > > 2.4.3, which has resolved all my issues with the time handling
> (watermarks,
> > > join windows) I had before with Spark 2.3.2.
> > >
> > > My current problem happens during a leftOuter join, where messages
> from 3
> > > topics are joined, the results are then aggregated with a groupBy and
> > > finally put onto a result Kafka topic. On the 3 input topics involved,
> all
> > > messages have ID and LAST_MOD fields. I use the ID for joining, and the
> > > LAST_MOD as event timestamp on all incoming streams. Since the fields
> on
> > > the incoming messages are all named the same (ID and LAST_MOD), I
> rename
> > > them on all incoming streams with
> > >
> > >  aDf = aStream.selectExpr("*", "ID as A_ID", "LAST_MOD as
> > > A_LAST_MOD").drop(*["ID", "LAST_MOD"])
> > >
> > > For those data frames, I then take the watermark with the
> A/B/C_LAST_MOD
> > > as event time, before joining. I know that the LAST_MOD timestamps are
> > > equal on the messages that I want to join together.
> > >
> > > The first join is an inner join, where a field on stream A links with
> the
> > > ID of stream B. So I have
> > >
> > >  aDf
> > > .join(bDf, expr("B_FK = B_ID"))   # B_FK is the field in
> stream A
> > > .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds"))
> > > .agg(
> > > collect_list(struct("*")).alias("RESULTS"),
> > > count("A_ID").alias("NUM_RESULTS"),
> > > # just add a timestamp to watermark on, they are all the
> > > min("A_LAST_MOD").alias("RE

Re: Spark structured streaming leftOuter join not working as I expect

2019-06-11 Thread Jungtaek Lim
Got the point. If you would like to get "correct" output, you may need to
set global watermark as "min", because watermark is not only used for
evicting rows in state, but also discarding input rows later than
watermark. Here you may want to be aware that there're two stateful
operators which will receive inputs from previous stage and discard them
via watermark before processing.

Btw, you may also need to consider the difference of the concept of
watermark between Spark and others:

1. Spark uses high watermark (picks highest event timestamp of input rows)
even for single watermark whereas other frameworks use low watermark (picks
lowest event timestamp of input rows). So you may always need to set enough
delay on watermark.

2. Spark uses global watermark whereas other frameworks normally use
operator-wise watermark. This is limitation of Spark (given outputs of
previous stateful operator will become inputs of next stateful operator,
they should have different watermark) and one of contributor proposes the
approach [1] which would fit for Spark (unfortunately it haven't been
reviewed by committers so long).

Thanks,
Jungtaek Lim (HeartSaVioR)

1. https://github.com/apache/spark/pull/23576

On Tue, Jun 11, 2019 at 7:06 AM Joe Ammann  wrote:

> Hi all
>
> it took me some time to get the issues extracted into a piece of
> standalone code. I created the following gist
>
> https://gist.github.com/jammann/b58bfbe0f4374b89ecea63c1e32c8f17
>
> I has messages for 4 topics A/B/C/D and a simple Python program which
> shows 6 use cases, with my expectations and observations with Spark 2.4.3
>
> It would be great if you could have a look and check if I'm doing
> something wrong, or this is indeed a limitation of Spark?
>
> On 6/5/19 5:35 PM, Jungtaek Lim wrote:
> > Nice to hear you're investigating the issue deeply.
> >
> > Btw, if attaching code is not easy, maybe you could share
> logical/physical plan on any batch: "detail" in SQL tab would show up the
> plan as string. Plans from sequential batches would be much helpful - and
> streaming query status in these batch (especially watermark) should be
> helpful too.
> >
>
>
> --
> CU, Joe
>


-- 
Name : Jungtaek Lim
Blog : http://medium.com/@heartsavior
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior


Re: [StructuredStreaming] HDFSBackedStateStoreProvider is leaking .crc files.

2019-06-12 Thread Jungtaek Lim
Nice finding!

Given you already pointed out previous issue which fixed similar issue, it
would be also easy for you to craft the patch and verify whether the fix
resolves your issue. Looking forward to see your patch.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Wed, Jun 12, 2019 at 8:23 PM Gerard Maas  wrote:

> Ooops - linked the wrong JIRA ticket:  (that other one is related)
>
> https://issues.apache.org/jira/browse/SPARK-28025
>
> On Wed, Jun 12, 2019 at 1:21 PM Gerard Maas  wrote:
>
>> Hi!
>> I would like to socialize this issue we are currently facing:
>> The Structured Streaming default CheckpointFileManager leaks .crc files
>> by leaving them behind after users of this class (like
>> HDFSBackedStateStoreProvider) apply their cleanup methods.
>>
>> This results in an unbounded creation of tiny files that eat away storage
>> by the block and, in our case, deteriorates the file system performance.
>>
>> We correlated the processedRowsPerSecond reported by the
>> StreamingQueryProgress against a count of the .crc files in the storage
>> directory (checkpoint + state store). The performance impact we observe is
>> dramatic.
>>
>> We are running on Kubernetes, using GlusterFS as the shared storage
>> provider.
>> [image: out processedRowsPerSecond vs. files in storage_process.png]
>> I have created a JIRA ticket with additional detail:
>>
>> https://issues.apache.org/jira/browse/SPARK-17475
>>
>> This is also related to an earlier discussion about the state store
>> unbounded disk-size growth, which was left unresolved back then:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-State-Store-storage-behavior-for-the-Stream-Deduplication-function-td34883.html
>>
>> If there's any additional detail I should add/research, please let me
>> know.
>>
>> kind regards, Gerard.
>>
>>
>>

-- 
Name : Jungtaek Lim
Blog : http://medium.com/@heartsavior
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior


Re: Change parallelism number in Spark Streaming

2019-06-27 Thread Jungtaek Lim
Great, thanks! Even better if you could share the slide as well (and if
possible video too), since it would be helpful for other users to
understand about the details.

Thanks again,
Jungtaek Lim (HeartSaVioR)

On Thu, Jun 27, 2019 at 7:33 PM Jacek Laskowski  wrote:

> Hi,
>
> I've got a talk "The internals of stateful stream processing in Spark
> Structured Streaming" at https://dataxday.fr/ today and am going to
> include the tool on the slides to thank you for the work. Thanks.
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://about.me/JacekLaskowski
> The Internals of Spark SQL https://bit.ly/spark-sql-internals
> The Internals of Spark Structured Streaming
> https://bit.ly/spark-structured-streaming
> The Internals of Apache Kafka https://bit.ly/apache-kafka-internals
> Follow me at https://twitter.com/jaceklaskowski
>
>
>
> On Thu, Jun 27, 2019 at 3:32 AM Jungtaek Lim  wrote:
>
>> Glad to help, Jacek.
>>
>> I'm happy you're doing similar thing, which means it could be pretty
>> useful for others as well. Looks like it might be good enough to contribute
>> state source and sink. I'll sort out my code and submit a PR.
>>
>> Thanks,
>> Jungtaek Lim (HeartSaVioR)
>>
>>
>> On Thu, Jun 27, 2019 at 7:54 AM Jacek Laskowski  wrote:
>>
>>> Hi Jungtaek,
>>>
>>> That's very helpful to have the state source. As a matter of fact I've
>>> just this week been working on a similar tool (!) and have been wondering
>>> how to recreate the schema of the state key and value. You've helped me a
>>> lot. Thanks.
>>>
>>> Jacek
>>>
>>> On Wed, 26 Jun 2019, 23:58 Jungtaek Lim,  wrote:
>>>
>>>> Hi,
>>>>
>>>> you could consider state operator's partition numbers as "max
>>>> parallelism", as parallelism can be reduced via applying coalesce. It would
>>>> be effectively working similar as key groups.
>>>>
>>>> If you're also considering offline query, there's a tool to manipulate
>>>> state which enables reading and writing state in structured streaming,
>>>> achieving rescaling and schema evolution.
>>>>
>>>> https://github.com/HeartSaVioR/spark-state-tools
>>>> (DISCLAIMER: I'm an author of this tool.)
>>>>
>>>> Thanks,
>>>> Jungtaek Lim (HeartSaVioR)
>>>>
>>>> On Thu, Jun 27, 2019 at 4:48 AM Rong, Jialei 
>>>> wrote:
>>>>
>>>>> Thank you for your quick reply!
>>>>>
>>>>> Is there any plan to improve this?
>>>>>
>>>>> I asked this question due to some investigation on comparing those
>>>>> state of art streaming systems, among which Flink and DataFlow allow
>>>>> changing parallelism number, and by my knowledge of Spark Streaming, it
>>>>> seems it is also able to do that: if some “key interval” concept is used,
>>>>> then state can somehow decoupled from partition number by consistent
>>>>> hashing.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Regards
>>>>>
>>>>> Jialei
>>>>>
>>>>>
>>>>>
>>>>> *From: *Jacek Laskowski 
>>>>> *Date: *Wednesday, June 26, 2019 at 11:00 AM
>>>>> *To: *"Rong, Jialei" 
>>>>> *Cc: *"user @spark" 
>>>>> *Subject: *Re: Change parallelism number in Spark Streaming
>>>>>
>>>>>
>>>>>
>>>>> Hi,
>>>>>
>>>>>
>>>>>
>>>>> It's not allowed to change the numer of partitions after your
>>>>> streaming query is started.
>>>>>
>>>>>
>>>>>
>>>>> The reason is exactly the number of state stores which is exactly the
>>>>> number of partitions (perhaps multiplied by the number of stateful
>>>>> operators).
>>>>>
>>>>>
>>>>>
>>>>> I think you'll even get a warning or an exception when you change it
>>>>> after restarting the query.
>>>>>
>>>>>
>>>>>
>>>>> The number of partitions is stored in a checkpoint location.
>>>>>
>>>>>
>>>>>
>>>>> Jacek
>>>>>
>>>>>
>>>>>
>>>>> On Wed, 26 Jun 2019, 19:30 Rong, Jialei, 
>>>>> wrote:
>>>>>
>>>>> Hi Dear Spark Expert
>>>>>
>>>>>
>>>>>
>>>>> I’m curious about a question regarding Spark Streaming/Structured
>>>>> Streaming: whether it allows to change parallelism number(the default one
>>>>> or the one specified in particular operator) in a stream having stateful
>>>>> transform/operator? Whether this will cause my checkpointed state get
>>>>> messed up?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Regards
>>>>>
>>>>> Jialei
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>> --
>>>> Name : Jungtaek Lim
>>>> Blog : http://medium.com/@heartsavior
>>>> Twitter : http://twitter.com/heartsavior
>>>> LinkedIn : http://www.linkedin.com/in/heartsavior
>>>>
>>>
>>
>> --
>> Name : Jungtaek Lim
>> Blog : http://medium.com/@heartsavior
>> Twitter : http://twitter.com/heartsavior
>> LinkedIn : http://www.linkedin.com/in/heartsavior
>>
>

-- 
Name : Jungtaek Lim
Blog : http://medium.com/@heartsavior
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior


Re: Change parallelism number in Spark Streaming

2019-06-26 Thread Jungtaek Lim
Hi,

you could consider state operator's partition numbers as "max parallelism",
as parallelism can be reduced via applying coalesce. It would be
effectively working similar as key groups.

If you're also considering offline query, there's a tool to manipulate
state which enables reading and writing state in structured streaming,
achieving rescaling and schema evolution.

https://github.com/HeartSaVioR/spark-state-tools
(DISCLAIMER: I'm an author of this tool.)

Thanks,
Jungtaek Lim (HeartSaVioR)

On Thu, Jun 27, 2019 at 4:48 AM Rong, Jialei 
wrote:

> Thank you for your quick reply!
>
> Is there any plan to improve this?
>
> I asked this question due to some investigation on comparing those state
> of art streaming systems, among which Flink and DataFlow allow changing
> parallelism number, and by my knowledge of Spark Streaming, it seems it is
> also able to do that: if some “key interval” concept is used, then state
> can somehow decoupled from partition number by consistent hashing.
>
>
>
>
>
> Regards
>
> Jialei
>
>
>
> *From: *Jacek Laskowski 
> *Date: *Wednesday, June 26, 2019 at 11:00 AM
> *To: *"Rong, Jialei" 
> *Cc: *"user @spark" 
> *Subject: *Re: Change parallelism number in Spark Streaming
>
>
>
> Hi,
>
>
>
> It's not allowed to change the numer of partitions after your streaming
> query is started.
>
>
>
> The reason is exactly the number of state stores which is exactly the
> number of partitions (perhaps multiplied by the number of stateful
> operators).
>
>
>
> I think you'll even get a warning or an exception when you change it after
> restarting the query.
>
>
>
> The number of partitions is stored in a checkpoint location.
>
>
>
> Jacek
>
>
>
> On Wed, 26 Jun 2019, 19:30 Rong, Jialei, 
> wrote:
>
> Hi Dear Spark Expert
>
>
>
> I’m curious about a question regarding Spark Streaming/Structured
> Streaming: whether it allows to change parallelism number(the default one
> or the one specified in particular operator) in a stream having stateful
> transform/operator? Whether this will cause my checkpointed state get
> messed up?
>
>
>
>
>
> Regards
>
> Jialei
>
>
>
>

-- 
Name : Jungtaek Lim
Blog : http://medium.com/@heartsavior
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior


Re: Change parallelism number in Spark Streaming

2019-06-26 Thread Jungtaek Lim
Glad to help, Jacek.

I'm happy you're doing similar thing, which means it could be pretty useful
for others as well. Looks like it might be good enough to contribute state
source and sink. I'll sort out my code and submit a PR.

Thanks,
Jungtaek Lim (HeartSaVioR)


On Thu, Jun 27, 2019 at 7:54 AM Jacek Laskowski  wrote:

> Hi Jungtaek,
>
> That's very helpful to have the state source. As a matter of fact I've
> just this week been working on a similar tool (!) and have been wondering
> how to recreate the schema of the state key and value. You've helped me a
> lot. Thanks.
>
> Jacek
>
> On Wed, 26 Jun 2019, 23:58 Jungtaek Lim,  wrote:
>
>> Hi,
>>
>> you could consider state operator's partition numbers as "max
>> parallelism", as parallelism can be reduced via applying coalesce. It would
>> be effectively working similar as key groups.
>>
>> If you're also considering offline query, there's a tool to manipulate
>> state which enables reading and writing state in structured streaming,
>> achieving rescaling and schema evolution.
>>
>> https://github.com/HeartSaVioR/spark-state-tools
>> (DISCLAIMER: I'm an author of this tool.)
>>
>> Thanks,
>> Jungtaek Lim (HeartSaVioR)
>>
>> On Thu, Jun 27, 2019 at 4:48 AM Rong, Jialei 
>> wrote:
>>
>>> Thank you for your quick reply!
>>>
>>> Is there any plan to improve this?
>>>
>>> I asked this question due to some investigation on comparing those state
>>> of art streaming systems, among which Flink and DataFlow allow changing
>>> parallelism number, and by my knowledge of Spark Streaming, it seems it is
>>> also able to do that: if some “key interval” concept is used, then state
>>> can somehow decoupled from partition number by consistent hashing.
>>>
>>>
>>>
>>>
>>>
>>> Regards
>>>
>>> Jialei
>>>
>>>
>>>
>>> *From: *Jacek Laskowski 
>>> *Date: *Wednesday, June 26, 2019 at 11:00 AM
>>> *To: *"Rong, Jialei" 
>>> *Cc: *"user @spark" 
>>> *Subject: *Re: Change parallelism number in Spark Streaming
>>>
>>>
>>>
>>> Hi,
>>>
>>>
>>>
>>> It's not allowed to change the numer of partitions after your streaming
>>> query is started.
>>>
>>>
>>>
>>> The reason is exactly the number of state stores which is exactly the
>>> number of partitions (perhaps multiplied by the number of stateful
>>> operators).
>>>
>>>
>>>
>>> I think you'll even get a warning or an exception when you change it
>>> after restarting the query.
>>>
>>>
>>>
>>> The number of partitions is stored in a checkpoint location.
>>>
>>>
>>>
>>> Jacek
>>>
>>>
>>>
>>> On Wed, 26 Jun 2019, 19:30 Rong, Jialei, 
>>> wrote:
>>>
>>> Hi Dear Spark Expert
>>>
>>>
>>>
>>> I’m curious about a question regarding Spark Streaming/Structured
>>> Streaming: whether it allows to change parallelism number(the default one
>>> or the one specified in particular operator) in a stream having stateful
>>> transform/operator? Whether this will cause my checkpointed state get
>>> messed up?
>>>
>>>
>>>
>>>
>>>
>>> Regards
>>>
>>> Jialei
>>>
>>>
>>>
>>>
>>
>> --
>> Name : Jungtaek Lim
>> Blog : http://medium.com/@heartsavior
>> Twitter : http://twitter.com/heartsavior
>> LinkedIn : http://www.linkedin.com/in/heartsavior
>>
>

-- 
Name : Jungtaek Lim
Blog : http://medium.com/@heartsavior
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior


Re: Spark Standalone - Failing to pass extra java options to the driver in cluster mode

2019-08-19 Thread Jungtaek Lim
Hi Alex,

you seem to hit SPARK-26606 [1] which has been fixed in 2.4.1. Could you
try it out with latest version?

Thanks,
Jungtaek Lim (HeartSaVioR)

1. https://issues.apache.org/jira/browse/SPARK-26606

On Tue, Aug 20, 2019 at 3:43 AM Alex Landa  wrote:

> Hi,
>
> We are using Spark Standalone 2.4.0 in production and publishing our Scala
> app using cluster mode.
> I saw that extra java options passed to the driver don't actually pass.
> A submit example:
> *spark-submit --deploy-mode cluster --master spark://:7077
> --driver-memory 512mb --conf
> "spark.driver.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError" --class
> App  app.jar *
>
> Doesn't pass *-XX:+HeapDumpOnOutOfMemoryError *as a JVM argument, but
> pass instead
> *-Dspark.driver.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError*I
> created a test app for it:
>
> val spark = SparkSession.builder()
>   .master("local")
>   .appName("testApp").getOrCreate()
> import spark.implicits._
>
> // get a RuntimeMXBean reference
> val runtimeMxBean = ManagementFactory.getRuntimeMXBean
>
> // get the jvm's input arguments as a list of strings
> val listOfArguments = runtimeMxBean.getInputArguments
>
> // print the arguments
> listOfArguments.asScala.foreach(a => println(s"ARG: $a"))
>
>
> I see that for client mode I get :
> ARG: -XX:+HeapDumpOnOutOfMemoryError
> while in cluster mode I get:
> ARG: -Dspark.driver.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError
>
> Would appreciate your help how to work around this issue.
> Thanks,
> Alex
>
>

-- 
Name : Jungtaek Lim
Blog : http://medium.com/@heartsavior
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior


Kafka offset committer tool for structured streaming query

2019-09-23 Thread Jungtaek Lim
Hi Spark users, especially Structured Streaming users who are using Kafka
as data source,

I'm pleased to introduce Kafka offset committer, which enables commit
offsets which batch has been processed. The tool is basically an
implementation of streaming query listener, which listens for events and
commit offsets for each batch. Please refer README.md in the repository to
see more details.

Here's Github repository for this tool:
https://github.com/HeartSaVioR/spark-sql-kafka-offset-committer
Currently it hasn't be published to Maven central, so you might need to
build the source and add jar via "--jars" option until artifact is
published.
I'd be happy to hear new ideas of improvements, and much appreciated for
contributions!

Enjoy!

Thanks,
Jungtaek Lim (HeartSaVioR)


Re: pyspark - memory leak leading to OOM after submitting 100 jobs?

2019-10-20 Thread Jungtaek Lim
Honestly I'd recommend you to spend you time to look into the issue, via
taking memory dump per some interval and compare differences (at least
share these dump files to community with redacting if necessary).
Otherwise someone has to try to reproduce without reproducer and even
couldn't reproduce even they spent their time. Memory leak issue is not
really easy to reproduce, unless it leaks some objects without any
conditions.

- Jungtaek Lim (HeartSaVioR)

On Sun, Oct 20, 2019 at 7:18 PM Paul Wais  wrote:

> Dear List,
>
> I've observed some sort of memory leak when using pyspark to run ~100
> jobs in local mode.  Each job is essentially a create RDD -> create DF
> -> write DF sort of flow.  The RDD and DFs go out of scope after each
> job completes, hence I call this issue a "memory leak."  Here's
> pseudocode:
>
> ```
> row_rdds = []
> for i in range(100):
>   row_rdd = spark.sparkContext.parallelize([{'a': i} for i in range(1000)])
>   row_rdds.append(row_rdd)
>
> for row_rdd in row_rdds:
>   df = spark.createDataFrame(row_rdd)
>   df.persist()
>   print(df.count())
>   df.write.save(...) # Save parquet
>   df.unpersist()
>
>   # Does not help:
>   # del df
>   # del row_rdd
> ```
>
> In my real application:
>  * rows are much larger, perhaps 1MB each
>  * row_rdds are sized to fit available RAM
>
> I observe that after 100 or so iterations of the second loop (each of
> which creates a "job" in the Spark WebUI), the following happens:
>  * pyspark workers have fairly stable resident and virtual RAM usage
>  * java process eventually approaches resident RAM cap (8GB standard)
> but virtual RAM usage keeps ballooning.
>
> Eventually the machine runs out of RAM and the linux OOM killer kills
> the java process, resulting in an "IndexError: pop from an empty
> deque" error from py4j/java_gateway.py .
>
>
> Does anybody have any ideas about what's going on?  Note that this is
> local mode.  I have personally run standalone masters and submitted a
> ton of jobs and never seen something like this over time.  Those were
> very different jobs, but perhaps this issue is bespoke to local mode?
>
> Emphasis: I did try to del the pyspark objects and run python GC.
> That didn't help at all.
>
> pyspark 2.4.4 on java 1.8 on ubuntu bionic (tensorflow docker image)
>
> 12-core i7 with 16GB of ram and 22GB swap file (swap is *on*).
>
> Cheers,
> -Paul
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: apache-spark Structured Stateful Streaming with window / SPARK-21641

2019-10-16 Thread Jungtaek Lim
First of all, I guess you've asked for using both "arbitrary stateful
operation" and "native support of windowing".
(Even you don't deal with state directly, whenever you use stateful
operations like streaming aggregation or stream-stream join, you use state.)

In short, there's no native support of windowing when you use
flatMapGroupsWithState - actually the meaning of "arbitrary" is "low level"
- you may need to deal with window logic by yourself. Please refer below
example to see how you can deal with window in flatMapGroupsWithState
(processing time session window):
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala

There're two types of APIs in Spark Dataset - "typed" and "untyped". Most
of features are available in untyped API (as you've referred), but not
exhaustive like (flat)MapGroupsWithState which leverage typed API. "window"
function is not supported in typed API so they cannot be used altogether. I
guess the restriction was introduced for simplicity and performance, but
not 100% sure.

You can still leverage "window" function to populate rows with window
bound, and map them via typed API, and apply groupByKey. Please take a look
at below query: you'll get more rows than the number of input rows here
since it applies sliding window.

val inputData = MemoryStream[(Long, Long)]
>
> inputData.toDF()
>   .selectExpr("_1", "CAST(_2 / 1000 AS TIMESTAMP) AS timestamp")
>   .select(col("*"), window(col("timestamp"), "10 seconds", "5
> seconds").as("window"))
>   .select(col("_1"), col("window.start").as("window_start"),
> col("window.end").as("window_end"))
>   .as[(Long, Timestamp, Timestamp)]
>

Hope it helps.

Thanks,
Jungtaek Lim (HeartSaVioR)




On Wed, Oct 16, 2019 at 6:36 AM Richard Reitmeyer
 wrote:

> What’s the right way use Structured Streaming with both state and windows?
>
>
>
> Looking at the slides from
> https://www.slideshare.net/databricks/arbitrary-stateful-aggregations-using-structured-streaming-in-apache-spark
>   slides 26 and 31, it looks like stateful processing events for every
> device every minute should be
>
>
>
> events
>
>   .withWatermark(“event_time”, “2 minutes”)
>
>   .groupBy(“device_id”, window(“event_time”, “1 minute”))
>
>   .flatMapWithState(…)(…)
>
>   …
>
>
>
> But with Apache Spark 2.4.4 this won’t work and it looks like
> https://issues.apache.org/jira/browse/SPARK-21641 is to blame.
>
>
>
> What’s the recommended way to handle this?
>
>
>
>
>


Re: chaining flatMapGroupsWithState in append mode

2019-11-28 Thread Jungtaek Lim
That will heavily depend on your implementation of state functions on
flatMapGroupsWithState, but assuming your state functions respect of the
output mode, chaining flatMapGroupsWithState doesn't work - I don't mean it
doesn't compile or doesn't run. I mean the query will not produce correct
output.

The guide doc of Structured Streaming in Spark 3.0 will contain the
limitation of global watermark which explains why the chaining of stateful
operators in Structured Streaming doesn't work correctly. Spark 3.0 will
also log the warning message if Spark detects the pattern. Reading the new
part in doc requires understanding of semantic of watermark and how global
watermark is calculated in prior, so a bit advanced topic. (Though I think
end users should also need to understand this to make sure their query is
"correct".)

You can see the change of doc previously in this PR:
https://github.com/apache/spark/pull/24890

Thanks,
Jungtaek Lim (HeartSaVioR)

On Thu, Nov 28, 2019 at 9:55 PM alex770  wrote:

> I wrote few examples chaining flatMapGroupsWithState in append mode and it
> worked. Other examples did not.
>
> The question is if the flatMapGroupsWithState chains in append mode are
> officially supported.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Operators supported by Spark Structured Streaming

2019-11-28 Thread Jungtaek Lim
Hi,

Unsupported operations in Structured Streaming is explained in the guide
doc.
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-operations

Thanks,
Jungtaek Lim (HeartSaVioR)


On Fri, Nov 29, 2019 at 2:08 PM shicheng31...@gmail.com <
shicheng31...@gmail.com> wrote:

> Hi:
> Spark Structured Streaming uses the DataFrame API. When programming,
> there are no compilation errors, but when running, it will report various
> unsupported conditions. The official website does not seem to have a
> document to list the unsupported operators. This will Inconvenient when
> developing. How did you solve this problem?
>
> --
> shicheng31...@gmail.com
>


Re: [ANNOUNCE] Announcing Apache Spark 2.4.5

2020-02-10 Thread Jungtaek Lim
Nice work, Dongjoon! Thanks for the huge efforts on sorting out with
correctness things as well.

On Tue, Feb 11, 2020 at 12:40 PM Wenchen Fan  wrote:

> Great Job, Dongjoon!
>
> On Mon, Feb 10, 2020 at 4:18 PM Hyukjin Kwon  wrote:
>
>> Thanks Dongjoon!
>>
>> 2020년 2월 9일 (일) 오전 10:49, Takeshi Yamamuro 님이 작성:
>>
>>> Happy to hear the release news!
>>>
>>> Bests,
>>> Takeshi
>>>
>>> On Sun, Feb 9, 2020 at 10:28 AM Dongjoon Hyun 
>>> wrote:
>>>
 There was a typo in one URL. The correct release note URL is here.

 https://spark.apache.org/releases/spark-release-2-4-5.html



 On Sat, Feb 8, 2020 at 5:22 PM Dongjoon Hyun 
 wrote:

> We are happy to announce the availability of Spark 2.4.5!
>
> Spark 2.4.5 is a maintenance release containing stability fixes. This
> release is based on the branch-2.4 maintenance branch of Spark. We
> strongly
> recommend all 2.4 users to upgrade to this stable release.
>
> To download Spark 2.4.5, head over to the download page:
> http://spark.apache.org/downloads.html
>
> Note that you might need to clear your browser cache or
> to use `Private`/`Incognito` mode according to your browsers.
>
> To view the release notes:
> https://spark.apache.org/releases/spark-release-2.4.5.html
>
> We would like to acknowledge all community members for contributing to
> this
> release. This release would not have been possible without you.
>
> Dongjoon Hyun
>

>>>
>>> --
>>> ---
>>> Takeshi Yamamuro
>>>
>>


Re: Structured Streaming: mapGroupsWithState UDT serialization does not work

2020-02-29 Thread Jungtaek Lim
I've investigated a bit, and looks like it's not an issue of
mapGroupsWithState, but an issue of how UDT is handled in UnsafeRow. It
seems to miss handling UDT and the missing spot makes the internal code of
Spark corrupt the value. (So if I'm not mistaken, it's a correctness issue.)

I've filed an issue (sorry I missed you've already filed an issue) and
submitted a patch. https://issues.apache.org/jira/browse/SPARK-30993

It would be nice if you can try out my patch and see whether it fixes your
issue (I've already copied your code and made it pass, but would like to
double check). Thanks for reporting!

On Sat, Feb 29, 2020 at 11:26 AM Bryan Jeffrey 
wrote:

>
> Hi Tathagata.
>
> I tried making changes as you suggested:
>
> @SQLUserDefinedType(udt = classOf[JodaTimeUDT])
> class JodaTimeUDT extends UserDefinedType[DateTime] {
>   override def sqlType: DataType  = TimestampType
>
>   override def serialize(obj: DateTime): Long = {
> obj.getMillis
>   }
>
>   def deserialize(datum: Any): DateTime = {
> datum match {
>case value: Long => new DateTime(value, DateTimeZone.UTC)
> }
>   }
>
>   override def userClass: Class[DateTime] = classOf[DateTime]
>
>   private[spark] override def asNullable: JodaTimeUDT = this
> }
>
> object JodaTimeUDTRegister {
>   def register : Unit = {
> UDTRegistration.register(classOf[DateTime].getName, 
> classOf[JodaTimeUDT].getName)
>   }
> }
>
>
> This did not resolve the problem.  The results remain the same:
>
>
> org.scalatest.exceptions.TestFailedException: 
> Array(FooWithDate(*2021-02-02T19:26:23.374Z*,Foo,1), 
> FooWithDate(2021-02-02T19:26:23.374Z,FooFoo,6)) did not contain the same 
> elements as Array(FooWithDate(2020-01-02T03:04:05.006Z,Foo,1), 
> FooWithDate(2020-01-02T03:04:05.006Z,FooFoo,6))
>
>
> I included a couple of other test cases to validate that the UDT works fine:
>
>
> "the joda time serializer" should "serialize and deserialize as expected" in {
>   val input = new DateTime(2020,1,2,3,4,5,6, DateTimeZone.UTC)
>   val serializer = new JodaTimeUDT()
>   val serialized = serializer.serialize(input)
>   val deserialized = serializer.deserialize(serialized)
>
>   deserialized should be(input)
> }
>
> it should "correctly implement dataframe serialization & deserialization in 
> data frames" in {
>   val date = new DateTime(2020,1,2,3,4,5,6, DateTimeZone.UTC)
>   val datePlusOne = new DateTime(2020,1,2,3,5,5,6, DateTimeZone.UTC)
>   val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date, "Foo", 3))
>   val sqlContext = session.sqlContext
>   import sqlContext.implicits._
>   val ds = input.toDF().as[FooWithDate]
>   val result = ds.map(x => FooWithDate(DateUtils.addInterval(x.date, 
> Minutes(1)), x.s, x.i + 1)).collect()
>   val expected = List(FooWithDate(datePlusOne, "Foo", 2), 
> FooWithDate(datePlusOne, "Foo", 4))
>
>   result should contain theSameElementsAs expected
> }
>
>
> Any other thoughts?
>
>
> On Fri, Feb 28, 2020 at 6:23 PM Tathagata Das 
> wrote:
>
>> Sounds like something to do with the serialization/deserialization, and
>> not related to mapGroupsWithState.
>>
>>
>> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala
>>
>> The docs says that
>> 1. this is deprecated and therefore should not be used
>> 2. you have to use the annotation `SQLUserDefinedType
>> `
>> on the class definition. You dont seem to have done it, maybe thats the
>> reason?
>>
>> I would debug by printing the values in the serialize/deserialize
>> methods, and then passing it through the groupBy that is known to fail.
>>
>> TD
>>
>> On Fri, Feb 28, 2020 at 2:45 PM Bryan Jeffrey 
>> wrote:
>>
>>> Tathagata,
>>>
>>> The difference is more than hours off. In this instance it's different
>>> by 4 years. In other instances it's different by tens of years (and other
>>> smaller durations).
>>>
>>> We've considered moving to storage as longs, but this makes code much
>>> less readable and harder to maintain. The udt serialization bug also causes
>>> issues outside of stateful streaming, as when executing a simple group by.
>>>
>>> Regards,
>>>
>>> Bryan Jeffrey
>>>
>>> Get Outlook for Android 
>>>
>>> --
>>> *From:* Tathagata Das 
>>> *Sent:* Friday, February 28, 2020 4:56:07 PM
>>> *To:* Bryan Jeffrey 
>>> *Cc:* user 
>>> *Subject:* Re: Structured Streaming: mapGroupsWithState UDT
>>> serialization does not work
>>>
>>> You are deserializing by explicitly specifying UTC timezone, but when
>>> serializing you are not specifying it. Maybe that is reason?
>>>
>>> Also, if you can encode it using just long, then I recommend just saving
>>> the value as long and eliminating some of the serialization overheads.
>>> Spark will probably better optimize stuff if it sees it as a long rather
>>> than an opaque UDT.

Re: Structured Streaming: mapGroupsWithState UDT serialization does not work

2020-02-29 Thread Jungtaek Lim
Forgot to mention - it only occurs the SQL type of UDT is having fixed
length. If the UDT is used to represent complex type like array, struct, or
even string, it doesn't trigger the issue. So that's like an edge-case and
the chance of encountering this issue may not be that huge, and that's why
this issue pops up now whereas the relevant code lives very long time.

On Sat, Feb 29, 2020 at 11:44 PM Jungtaek Lim 
wrote:

> I've investigated a bit, and looks like it's not an issue of
> mapGroupsWithState, but an issue of how UDT is handled in UnsafeRow. It
> seems to miss handling UDT and the missing spot makes the internal code of
> Spark corrupt the value. (So if I'm not mistaken, it's a correctness issue.)
>
> I've filed an issue (sorry I missed you've already filed an issue) and
> submitted a patch. https://issues.apache.org/jira/browse/SPARK-30993
>
> It would be nice if you can try out my patch and see whether it fixes your
> issue (I've already copied your code and made it pass, but would like to
> double check). Thanks for reporting!
>
> On Sat, Feb 29, 2020 at 11:26 AM Bryan Jeffrey 
> wrote:
>
>>
>> Hi Tathagata.
>>
>> I tried making changes as you suggested:
>>
>> @SQLUserDefinedType(udt = classOf[JodaTimeUDT])
>> class JodaTimeUDT extends UserDefinedType[DateTime] {
>>   override def sqlType: DataType  = TimestampType
>>
>>   override def serialize(obj: DateTime): Long = {
>> obj.getMillis
>>   }
>>
>>   def deserialize(datum: Any): DateTime = {
>> datum match {
>>case value: Long => new DateTime(value, DateTimeZone.UTC)
>> }
>>   }
>>
>>   override def userClass: Class[DateTime] = classOf[DateTime]
>>
>>   private[spark] override def asNullable: JodaTimeUDT = this
>> }
>>
>> object JodaTimeUDTRegister {
>>   def register : Unit = {
>> UDTRegistration.register(classOf[DateTime].getName, 
>> classOf[JodaTimeUDT].getName)
>>   }
>> }
>>
>>
>> This did not resolve the problem.  The results remain the same:
>>
>>
>> org.scalatest.exceptions.TestFailedException: 
>> Array(FooWithDate(*2021-02-02T19:26:23.374Z*,Foo,1), 
>> FooWithDate(2021-02-02T19:26:23.374Z,FooFoo,6)) did not contain the same 
>> elements as Array(FooWithDate(2020-01-02T03:04:05.006Z,Foo,1), 
>> FooWithDate(2020-01-02T03:04:05.006Z,FooFoo,6))
>>
>>
>> I included a couple of other test cases to validate that the UDT works fine:
>>
>>
>> "the joda time serializer" should "serialize and deserialize as expected" in 
>> {
>>   val input = new DateTime(2020,1,2,3,4,5,6, DateTimeZone.UTC)
>>   val serializer = new JodaTimeUDT()
>>   val serialized = serializer.serialize(input)
>>   val deserialized = serializer.deserialize(serialized)
>>
>>   deserialized should be(input)
>> }
>>
>> it should "correctly implement dataframe serialization & deserialization in 
>> data frames" in {
>>   val date = new DateTime(2020,1,2,3,4,5,6, DateTimeZone.UTC)
>>   val datePlusOne = new DateTime(2020,1,2,3,5,5,6, DateTimeZone.UTC)
>>   val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date, "Foo", 3))
>>   val sqlContext = session.sqlContext
>>   import sqlContext.implicits._
>>   val ds = input.toDF().as[FooWithDate]
>>   val result = ds.map(x => FooWithDate(DateUtils.addInterval(x.date, 
>> Minutes(1)), x.s, x.i + 1)).collect()
>>   val expected = List(FooWithDate(datePlusOne, "Foo", 2), 
>> FooWithDate(datePlusOne, "Foo", 4))
>>
>>   result should contain theSameElementsAs expected
>> }
>>
>>
>> Any other thoughts?
>>
>>
>> On Fri, Feb 28, 2020 at 6:23 PM Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Sounds like something to do with the serialization/deserialization, and
>>> not related to mapGroupsWithState.
>>>
>>>
>>> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala
>>>
>>> The docs says that
>>> 1. this is deprecated and therefore should not be used
>>> 2. you have to use the annotation `SQLUserDefinedType
>>> <https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/types/SQLUserDefinedType.java>`
>>> on the class definition. You dont seem to have done it, maybe thats the
>>> reason?
>>>
>>> I would debug by printing the values in the serialize/deserialize
>>> methods

Re: Extract value from streaming Dataframe to a variable

2020-02-02 Thread Jungtaek Lim
`foreachBatch` is being added in Spark 2.4.x if I understand correctly, so
in any language you'll want to upgrade Spark to 2.4.x to use
`foreachBatch`. PySpark is addressed as well.

https://issues.apache.org/jira/browse/SPARK-24565


On Wed, Jan 22, 2020 at 1:12 AM Nick Dawes  wrote:

> Thanks for your reply.
>
> I'm using Spark 2.3.2. Looks like foreach operation is only supported for
> Java and Scala. Is there any alternative for Python?
>
> On Mon, Jan 20, 2020, 5:09 PM Jungtaek Lim 
> wrote:
>
>> Hi,
>>
>> you can try out foreachBatch to apply the batch query operation to the
>> each output of micro-batch:
>>
>> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch
>>
>> Hope this helps.
>>
>> Thanks,
>> Jungtaek Lim (HeartSaVioR)
>>
>>
>> On Mon, Jan 20, 2020 at 8:43 PM Nick Dawes  wrote:
>>
>>> Streaming experts, any clues how to achieve this?
>>>
>>> After extracting few variables, I need to run them through a REST API
>>> for verification and decision making.
>>>
>>> Thanks for your help.
>>>
>>> Nick
>>>
>>> On Fri, Jan 17, 2020, 6:27 PM Nick Dawes  wrote:
>>>
>>>> I need to extract a value from a PySpark structured streaming Dataframe
>>>> to a string variable to check something.
>>>>
>>>> I tried this code.
>>>>
>>>> agentName =
>>>> kinesisDF.select(kinesisDF.agentName.getItem(0).alias("agentName")).collect()[0][0]
>>>>
>>>> This works on a non-streaming Dataframe only. In a streaming Dataframe,
>>>> collect is not supported.
>>>>
>>>> Any workaround for this?
>>>>
>>>> Nick
>>>>
>>>>
>>>>


Re: How to prevent and track data loss/dropped due to watermark during structure streaming aggregation

2020-02-02 Thread Jungtaek Lim
Have you try out printing timestamp for rows in each batch and watermark
while you add artificial delay on processing batch?

First of all, you're technically using "processing time" in your query,
where you will never have "late events" theoretically. Watermark is to
handle out-of-order events and you won't need it.

If Spark requires the watermark due to the technical reason, you can just
set it to 0 and any events shouldn't be lost.

> So, let's say during shuffle stage (groupby) or write stage, we have a
delay of 5 to 10 minutes, will we lose data due to watermark of 2 minutes
here?

If your batch is being delayed, the timestamp in the data will be also
delayed as the notion of "processing time". No data will be lost, but
as you're relying on processing time, the result can be affected by various
circumstances.

3 mins of window and 5 to 10 mins of batch delay would lead the grouping
only applied within a batch. Applying watermark here doesn't help the
situation but just slows down the output unnecessarily.

That's the power of "event time" processing. You'll have consistent result
even in delay, out-of-order events, etc. whereas the issue you've describe
actually applies to "event time" processing (delayed output vs discarded
late events).

Hope this helps.
Jungtaek Lim (HeartSaVioR)

On Fri, Jan 24, 2020 at 7:19 AM stevech.hu  wrote:

> Anyone know the answers or pointers? thanks.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Extract value from streaming Dataframe to a variable

2020-01-20 Thread Jungtaek Lim
Hi,

you can try out foreachBatch to apply the batch query operation to the each
output of micro-batch:
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)


On Mon, Jan 20, 2020 at 8:43 PM Nick Dawes  wrote:

> Streaming experts, any clues how to achieve this?
>
> After extracting few variables, I need to run them through a REST API for
> verification and decision making.
>
> Thanks for your help.
>
> Nick
>
> On Fri, Jan 17, 2020, 6:27 PM Nick Dawes  wrote:
>
>> I need to extract a value from a PySpark structured streaming Dataframe
>> to a string variable to check something.
>>
>> I tried this code.
>>
>> agentName =
>> kinesisDF.select(kinesisDF.agentName.getItem(0).alias("agentName")).collect()[0][0]
>>
>> This works on a non-streaming Dataframe only. In a streaming Dataframe,
>> collect is not supported.
>>
>> Any workaround for this?
>>
>> Nick
>>
>>
>>


Re: [ANNOUNCE] Announcing Apache Spark 3.0.0-preview2

2019-12-24 Thread Jungtaek Lim
Great work, Yuming! Happy Holidays.

On Wed, Dec 25, 2019 at 9:08 AM Dongjoon Hyun 
wrote:

> Indeed! Thank you again, Yuming and all.
>
> Bests,
> Dongjoon.
>
>
> On Tue, Dec 24, 2019 at 13:38 Takeshi Yamamuro 
> wrote:
>
>> Great work, Yuming!
>>
>> Bests,
>> Takeshi
>>
>> On Wed, Dec 25, 2019 at 6:00 AM Xiao Li  wrote:
>>
>>> Thank you all. Happy Holidays!
>>>
>>> Xiao
>>>
>>> On Tue, Dec 24, 2019 at 12:53 PM Yuming Wang  wrote:
>>>
 Hi all,

 To enable wide-scale community testing of the upcoming Spark 3.0
 release, the Apache Spark community has posted a new preview release of
 Spark 3.0. This preview is *not a stable release in terms of either
 API or functionality*, but it is meant to give the community early
 access to try the code that will become Spark 3.0. If you would like to
 test the release, please download it, and send feedback using either the 
 mailing
 lists  or JIRA
 
 .

 There are a lot of exciting new features added to Spark 3.0, including
 Dynamic Partition Pruning, Adaptive Query Execution, Accelerator-aware
 Scheduling, Data Source API with Catalog Supports, Vectorization in SparkR,
 support of Hadoop 3/JDK 11/Scala 2.12, and many more. For a full list of
 major features and changes in Spark 3.0.0-preview2, please check the 
 thread(
 http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-3-0-preview-release-feature-list-and-major-changes-td28050.html
  and
 http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-3-0-preview-release-2-td28491.html
 ).

 We'd like to thank our contributors and users for their contributions
 and early feedback to this release. This release would not have been
 possible without you.

 To download Spark 3.0.0-preview2, head over to the download page:
 https://archive.apache.org/dist/spark/spark-3.0.0-preview2

 Happy Holidays.

 Yuming

>>>
>>>
>>> --
>>> [image: Databricks Summit - Watch the talks]
>>> 
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>


Re: Spark structured streaming - Fallback to earliest offset

2020-04-14 Thread Jungtaek Lim
I think Spark is trying to ensure that it reads the input "continuously"
without any missing. Technically it may be valid to say the situation is a
kind of "data-loss", as the query couldn't process the offsets which are
being thrown out, and owner of the query needs to be careful as it affects
the result.

If your streaming query keeps up with input rate then it's pretty rare for
the query to go under retention. Even it lags a bit, it'd be safe if
retention is set to enough period. The ideal state would be ensuring your
query to process all offsets before they are thrown out by retention (don't
leave the query lagging behind - either increasing processing power or
increasing retention duration, though most probably you'll need to do
former), but if you can't make sure and if you understand the risk then yes
you can turn off the option and take the risk.


On Wed, Apr 15, 2020 at 9:24 AM Ruijing Li  wrote:

> I see, I wasn’t sure if that would work as expected. The docs seems to
> suggest to be careful before turning off that option, and I’m not sure why
> failOnDataLoss is true by default.
>
> On Tue, Apr 14, 2020 at 5:16 PM Burak Yavuz  wrote:
>
>> Just set `failOnDataLoss=false` as an option in readStream?
>>
>> On Tue, Apr 14, 2020 at 4:33 PM Ruijing Li  wrote:
>>
>>> Hi all,
>>>
>>> I have a spark structured streaming app that is consuming from a kafka
>>> topic with retention set up. Sometimes I face an issue where my query has
>>> not finished processing a message but the retention kicks in and deletes
>>> the offset, which since I use the default setting of “failOnDataLoss=true”
>>> causes my query to fail. The solution I currently have is manual, deleting
>>> the offsets directory and rerunning.
>>>
>>> I instead like to have spark automatically fall back to the earliest
>>> offset available. The solutions I saw recommend setting auto.offset =
>>> earliest, but for structured streaming, you cannot set that. How do I do
>>> this for structured streaming?
>>>
>>> Thanks!
>>> --
>>> Cheers,
>>> Ruijing Li
>>>
>> --
> Cheers,
> Ruijing Li
>


Re: Understanding spark structured streaming checkpointing system

2020-04-19 Thread Jungtaek Lim
That sounds odd. Is it intermittent, or always reproducible if you starts
with same checkpoint? What's the version of Spark?

On Fri, Apr 17, 2020 at 6:17 AM Ruijing Li  wrote:

> Hi all,
>
> I have a question on how structured streaming does checkpointing. I’m
> noticing that spark is not reading from the max / latest offset it’s seen.
> For example, in HDFS, I see it stored offset file 30 which contains
> partition: offset {1: 2000}
>
> But instead after stopping the job and restarting it, I see it instead
> reads from offset file 9 which contains {1:1000}
>
> Can someone explain why spark doesn’t take the max offset?
>
> Thanks.
> --
> Cheers,
> Ruijing Li
>


Re: Spark structured streaming - Fallback to earliest offset

2020-04-19 Thread Jungtaek Lim
You may want to check "where" the job is stuck via taking thread dump - it
could be in kafka consumer, in Spark codebase, etc. Without the information
it's hard to say.

On Thu, Apr 16, 2020 at 4:22 PM Ruijing Li  wrote:

> Thanks Jungtaek, that makes sense.
>
> I tried Burak’s solution of just turning failOnDataLoss to be false, but
> instead of failing, the job is stuck. I’m guessing that the offsets are
> being deleted faster than the job can process them and it will be stuck
> unless I increase resources? Or does once the exception happen, spark will
> hang?
>
> On Tue, Apr 14, 2020 at 10:48 PM Jungtaek Lim <
> kabhwan.opensou...@gmail.com> wrote:
>
>> I think Spark is trying to ensure that it reads the input "continuously"
>> without any missing. Technically it may be valid to say the situation is a
>> kind of "data-loss", as the query couldn't process the offsets which are
>> being thrown out, and owner of the query needs to be careful as it affects
>> the result.
>>
>> If your streaming query keeps up with input rate then it's pretty rare
>> for the query to go under retention. Even it lags a bit, it'd be safe if
>> retention is set to enough period. The ideal state would be ensuring your
>> query to process all offsets before they are thrown out by retention (don't
>> leave the query lagging behind - either increasing processing power or
>> increasing retention duration, though most probably you'll need to do
>> former), but if you can't make sure and if you understand the risk then yes
>> you can turn off the option and take the risk.
>>
>>
>> On Wed, Apr 15, 2020 at 9:24 AM Ruijing Li  wrote:
>>
>>> I see, I wasn’t sure if that would work as expected. The docs seems to
>>> suggest to be careful before turning off that option, and I’m not sure why
>>> failOnDataLoss is true by default.
>>>
>>> On Tue, Apr 14, 2020 at 5:16 PM Burak Yavuz  wrote:
>>>
>>>> Just set `failOnDataLoss=false` as an option in readStream?
>>>>
>>>> On Tue, Apr 14, 2020 at 4:33 PM Ruijing Li 
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I have a spark structured streaming app that is consuming from a kafka
>>>>> topic with retention set up. Sometimes I face an issue where my query has
>>>>> not finished processing a message but the retention kicks in and deletes
>>>>> the offset, which since I use the default setting of “failOnDataLoss=true”
>>>>> causes my query to fail. The solution I currently have is manual, deleting
>>>>> the offsets directory and rerunning.
>>>>>
>>>>> I instead like to have spark automatically fall back to the earliest
>>>>> offset available. The solutions I saw recommend setting auto.offset =
>>>>> earliest, but for structured streaming, you cannot set that. How do I do
>>>>> this for structured streaming?
>>>>>
>>>>> Thanks!
>>>>> --
>>>>> Cheers,
>>>>> Ruijing Li
>>>>>
>>>> --
>>> Cheers,
>>> Ruijing Li
>>>
>> --
> Cheers,
> Ruijing Li
>


Re: Using startingOffsets latest - no data from structured streaming kafka query

2020-04-19 Thread Jungtaek Lim
Did you provide more records to topic "after" you started the query? That's
the only one I can imagine based on such information.

On Fri, Apr 17, 2020 at 9:13 AM Ruijing Li  wrote:

> Hi all,
>
> Apologies if this has been asked before, but I could not find the answer
> to this question. We have a structured streaming job, but for some reason,
> if we use startingOffsets = latest with foreachbatch mode, it doesn’t
> produce any data.
>
> Rather, in logs I see it repeats the message “ Fetcher [Consumer]
> Resetting offset for partition to offset” over and over again..
>
> However with startingOffsets=earliest, we don’t get this issue. I’m
> wondering then how we can use startingOffsets=latest as I wish to start
> from the latest offset available.
> --
> Cheers,
> Ruijing Li
>


Re: [Structured Streaming] Checkpoint file compact file grows big

2020-04-19 Thread Jungtaek Lim
Deleting the latest .compact file would lose the ability for exactly-once
and lead Spark fail to read from the output directory. If you're reading
the output directory from non-Spark then metadata on output directory
doesn't matter, but there's no exactly-once (exactly-once is achieved
leveraging the metadata, which only Spark can read).

Btw, what you've encountered is the one of known issues on file stream sink
- there're two different JIRA issues filed for the same issue so far
(reported from end users):

https://issues.apache.org/jira/browse/SPARK-24295
https://issues.apache.org/jira/browse/SPARK-29995

I've proposed the retention of output files in file stream sink but haven't
got some love. (That means it's not guaranteed to be addressed)

https://issues.apache.org/jira/browse/SPARK-27188

Given the patch is stale, I'm planning to rework based on latest master
again sooner.

Btw, I've also proposed other improvements to help addressing latency
issues in file stream source & file stream sink but haven't got some love
from committers as well (no guarantee to be addressed)

https://issues.apache.org/jira/browse/SPARK-30804
https://issues.apache.org/jira/browse/SPARK-30866
https://issues.apache.org/jira/browse/SPARK-30900
https://issues.apache.org/jira/browse/SPARK-30915
https://issues.apache.org/jira/browse/SPARK-30946

SPARK-30946 is closely related to the issue - it will help the size of
checkpoint file much smaller and also much shorter elapsed time to compact.
Efficiency would depend on compression ratio, but it could achieve 5 times
faster to compact and 80% smaller (1/5 of original) which would delay the
point of time greatly even without TTL. Say, if you reached the bad state
in 2 weeks, the patch would make it delayed by 8 weeks more (10 weeks to
reach the bad state).

That said, it doesn't completely get rid of necessity of TTL, but open the
chance to have longer TTL without encountering bad state.

If you're adventurous you can apply these patches on your version of Spark
and see whether it helps.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)


On Thu, Apr 16, 2020 at 9:24 AM Ahn, Daniel 
wrote:

> Are Spark Structured Streaming checkpoint files expected to grow over time
> indefinitely? Is there a recommended way to safely age-off old checkpoint
> data?
>
>
>
> Currently we have a Spark Structured Streaming process reading from Kafka
> and writing to an HDFS sink, with checkpointing enabled and writing to a
> location on HDFS. This streaming application has been running for 4 months
> and over time we have noticed that with every 10th job within the
> application there is about a 5 minute delay between when a job finishes and
> the next job starts which we have attributed to the checkpoint compaction
> process. At this point the .compact file that is written is about 2GB in
> size and the contents of the file show it keeps track of files it processed
> at the very origin of the streaming application.
>
>
>
> This issue can be reproduced with any Spark Structured Streaming process
> that writes checkpoint files.
>
>
>
> Is the best approach for handling the growth of these files to simply
> delete the latest .compact file within the checkpoint directory, and are
> there any associated risks with doing so?
>
>
>
>
> This e-mail, including attachments, may include confidential and/or
> proprietary information, and may be used only by the person or entity
> to which it is addressed. If the reader of this e-mail is not the intended
> recipient or his or her authorized agent, the reader is hereby notified
> that any dissemination, distribution or copying of this e-mail is
> prohibited. If you have received this e-mail in error, please notify the
> sender by replying to this message and delete this e-mail immediately.
>


Re: Spark hangs while reading from jdbc - does nothing Removing Guess work from trouble shooting

2020-04-16 Thread Jungtaek Lim
Do thread dump continuously, per specific period (like 1s) and see the
change of stack / lock for each thread. (This is not easy to be done in UI
so maybe doing manually would be the only option. Not sure Spark UI will
provide the same, haven't used at all.)

It will tell which thread is being blocked (even it's shown as running) and
which point to look at.

On Thu, Apr 16, 2020 at 4:29 PM Ruijing Li  wrote:

> Once I do. thread dump, what should I be looking for to tell where it is
> hanging? Seeing a lot of timed_waiting and waiting on driver. Driver is
> also being blocked by spark UI. If there are no tasks, is there a point to
> do thread dump of executors?
>
> On Tue, Apr 14, 2020 at 4:49 AM Gabor Somogyi 
> wrote:
>
>> The simplest way is to do thread dump which doesn't require any fancy
>> tool (it's available on Spark UI).
>> Without thread dump it's hard to say anything...
>>
>>
>> On Tue, Apr 14, 2020 at 11:32 AM jane thorpe 
>> wrote:
>>
>>> Here a is another tool I use Logic Analyser  7:55
>>> https://youtu.be/LnzuMJLZRdU
>>>
>>> you could take some suggestions for improving performance  queries.
>>> https://dzone.com/articles/why-you-should-not-use-select-in-sql-query-1
>>>
>>>
>>> Jane thorpe
>>> janethor...@aol.com
>>>
>>>
>>> -Original Message-
>>> From: jane thorpe 
>>> To: janethorpe1 ; mich.talebzadeh <
>>> mich.talebza...@gmail.com>; liruijing09 ; user <
>>> user@spark.apache.org>
>>> Sent: Mon, 13 Apr 2020 8:32
>>> Subject: Re: Spark hangs while reading from jdbc - does nothing Removing
>>> Guess work from trouble shooting
>>>
>>>
>>>
>>> This tool may be useful for you to trouble shoot your problems away.
>>>
>>>
>>> https://www.javacodegeeks.com/2020/04/simplifying-apm-remove-the-guesswork-from-troubleshooting.html
>>>
>>>
>>> "APM tools typically use a waterfall-type view to show the blocking
>>> time of different components cascading through the control flow within an
>>> application.
>>> These types of visualizations are useful, and AppOptics has them, but
>>> they can be difficult to understand for those of us without a PhD."
>>>
>>> Especially  helpful if you want to understand through visualisation and
>>> you do not have a phD.
>>>
>>>
>>> Jane thorpe
>>> janethor...@aol.com
>>>
>>>
>>> -Original Message-
>>> From: jane thorpe 
>>> To: mich.talebzadeh ; liruijing09 <
>>> liruijin...@gmail.com>; user 
>>> CC: user 
>>> Sent: Sun, 12 Apr 2020 4:35
>>> Subject: Re: Spark hangs while reading from jdbc - does nothing
>>>
>>> You seem to be implying the error is intermittent.
>>> You seem to be implying data is being ingested  via JDBC. So the
>>> connection has proven itself to be working unless no data is arriving from
>>> the  JDBC channel at all.  If no data is arriving then one could say it
>>> could be  the JDBC.
>>> If the error is intermittent  then it is likely a resource involved in
>>> processing is filling to capacity.
>>> Try reducing the data ingestion volume and see if that completes, then
>>> increase the data ingested  incrementally.
>>> I assume you have  run the job on small amount of data so you have
>>> completed your prototype stage successfully.
>>>
>>> --
>>> On Saturday, 11 April 2020 Mich Talebzadeh 
>>> wrote:
>>> Hi,
>>>
>>> Have you checked your JDBC connections from Spark to Oracle. What is
>>> Oracle saying? Is it doing anything or hanging?
>>>
>>> set pagesize 
>>> set linesize 140
>>> set heading off
>>> select SUBSTR(name,1,8) || ' sessions as on '||TO_CHAR(CURRENT_DATE,
>>> 'MON DD  HH:MI AM') from v$database;
>>> set heading on
>>> column spid heading "OS PID" format a6
>>> column process format a13 heading "Client ProcID"
>>> column username  format a15
>>> column sid   format 999
>>> column serial#   format 9
>>> column STATUSformat a3 HEADING 'ACT'
>>> column last  format 9,999.99
>>> column TotGets   format 999,999,999,999 HEADING 'Logical I/O'
>>> column phyRdsformat 999,999,999 HEADING 'Physical I/O'
>>> column total_memory format 999,999,999 HEADING 'MEM/KB'
>>> --
>>> SELECT
>>>   substr(a.username,1,15) "LOGIN"
>>> , substr(a.sid,1,5) || ','||substr(a.serial#,1,5) AS
>>> "SID/serial#"
>>> , TO_CHAR(a.logon_time, 'DD/MM HH:MI') "LOGGED IN SINCE"
>>> , substr(a.machine,1,10) HOST
>>> , substr(p.username,1,8)||'/'||substr(p.spid,1,5) "OS PID"
>>> , substr(a.osuser,1,8)||'/'||substr(a.process,1,5) "Client PID"
>>> , substr(a.program,1,15) PROGRAM
>>> --,ROUND((CURRENT_DATE-a.logon_time)*24) AS "Logged/Hours"
>>> , (
>>> select round(sum(ss.value)/1024) from v$sesstat ss,
>>> v$statname sn
>>> where ss.sid = a.sid and
>>> sn.statistic# = ss.statistic# and
>>> -- sn.name in ('session pga memory')
>>> sn.name in ('session pga memory','session uga
>>> memory')
>>>   ) AS total_memory
>>>

Re: spark structured streaming GroupState returns weird values from sate

2020-03-27 Thread Jungtaek Lim
Could you play with Encoders.bean()? You can Encoders.bean() with your
class, and call .schema() with the return value to see how it transforms to
the schema in Spark SQL. The schema must be consistent across multiple JVM
runs to make it work properly, but I suspect it doesn't retain the order.

On Fri, Mar 27, 2020 at 10:28 PM Srinivas V  wrote:

> I am listening to Kafka topic with a structured streaming application with
> Java,  testing it on my local Mac.
> When I retrieve back GroupState object with
> state.get(), it is giving some random values for the fields in the object,
> some are interchanging some are default and some are junk values.
>
> See this example below:
> While setting I am setting:
> ProductSessionInformation{requestId='222112345', productId='222112345',
> priority='0', firstEventTimeMillis=1585312384,
> lastEventTimeMillis=1585312384, firstReceivedTimeMillis=1585312401693,
> numberOfEvents=1}
>
> When I retrieve it back, it comes like this:
> ProductSessionInformation{requestId='some junk characters are coming here'
> productId='222112345', priority='222112345',
> firstEventTimeMillis=1585312401693, lastEventTimeMillis=1,
> firstReceivedTimeMillis=1585312384, numberOfEvents=1}
>
> Any clue why it might be happening? I am stuck with this for couple of
> days. Immediate help is appreciated.
>
> code snippet:
>
>
> public class StateUpdateTask implements MapGroupsWithStateFunction Event, ProductStateInformation, ProductSessionUpdate> {
>
>  @Override
> public ProductSessionUpdate call(String productId, Iterator 
> eventsIterator, GroupState state) throws Exception {
> {
>
>
>
>   if (state.hasTimedOut()) {
>
> //
>
> }else{
>
> if (state.exists()) {
> ProductStateInformation oldSession = state.get();
> System.out.println("State for productId:"+productId + " with old values 
> "+oldSession);
>
> }
>
>
> public class EventsApp implements Serializable{
>
> public void run(String[] args) throws Exception {
>
> ...
>
>
> Dataset dataSet = sparkSession
> .readStream()
> .format("kafka")
> .option("kafka.bootstrap.servers", "localhost")
> .option("startingOffsets","latest")
> .option("failOnDataLoss", "false")
> .option("subscribe", "topic1,topic2")
> .option("includeTimestamp", true)
>
> .load();
>
>  eventsDS.groupByKey(
> new MapFunction() {
> @Override public String call(Event event) {
> return event.getProductId();
> }
> }, Encoders.STRING())
> .mapGroupsWithState(
> new StateUpdateTask(3),
> Encoders.bean(ProductSessionInformation.class),
> Encoders.bean(ProductSessionUpdate.class),
> GroupStateTimeout.ProcessingTimeTimeout());
>
> ...
>
>
> StreamingQuery query = productUpdates
> .writeStream()
> .foreach(new ForeachWriter() {
> @Override
> public boolean open(long l, long l1) {return true;}
>
> @Override
> public void process(ProductSessionUpdate productSessionUpdate) {
> logger.info("-> query process: "+ productSessionUpdate);
> }
>
> @Override
> public void close(Throwable throwable) {}
> })
> .outputMode("update")
> .option("checkpointLocation", checkpointDir)
> .start();
>
> query.awaitTermination();
>
> }
>
> public class ProductStateInformation implements Serializable {
>
> protected String requestId;
> protected String productId;
> protected String priority;
> protected long firstEventTimeMillis;
> protected long lastEventTimeMillis;
> protected long firstReceivedTimeMillis;
> protected int numberOfEvents;
>
> ...//getter setters
>
> }
>
> These are are the versions I am using:
>
> 2.3.1
> 2.4.3
>
> 2.6.60.10.2.0
>
> 3.0.3
>
>


Re: spark structured streaming GroupState returns weird values from sate

2020-03-27 Thread Jungtaek Lim
Well, the code itself doesn't seem to be OK - you're using
ProductStateInformation as the class of State whereas you provide
ProductSessionInformation to Encoder for State.

On Fri, Mar 27, 2020 at 11:14 PM Jungtaek Lim 
wrote:

> Could you play with Encoders.bean()? You can Encoders.bean() with your
> class, and call .schema() with the return value to see how it transforms to
> the schema in Spark SQL. The schema must be consistent across multiple JVM
> runs to make it work properly, but I suspect it doesn't retain the order.
>
> On Fri, Mar 27, 2020 at 10:28 PM Srinivas V  wrote:
>
>> I am listening to Kafka topic with a structured streaming application
>> with Java,  testing it on my local Mac.
>> When I retrieve back GroupState object with
>> state.get(), it is giving some random values for the fields in the object,
>> some are interchanging some are default and some are junk values.
>>
>> See this example below:
>> While setting I am setting:
>> ProductSessionInformation{requestId='222112345', productId='222112345',
>> priority='0', firstEventTimeMillis=1585312384,
>> lastEventTimeMillis=1585312384, firstReceivedTimeMillis=1585312401693,
>> numberOfEvents=1}
>>
>> When I retrieve it back, it comes like this:
>> ProductSessionInformation{requestId='some junk characters are coming
>> here' productId='222112345', priority='222112345',
>> firstEventTimeMillis=1585312401693, lastEventTimeMillis=1,
>> firstReceivedTimeMillis=1585312384, numberOfEvents=1}
>>
>> Any clue why it might be happening? I am stuck with this for couple of
>> days. Immediate help is appreciated.
>>
>> code snippet:
>>
>>
>> public class StateUpdateTask implements MapGroupsWithStateFunction> Event, ProductStateInformation, ProductSessionUpdate> {
>>
>>  @Override
>> public ProductSessionUpdate call(String productId, Iterator 
>> eventsIterator, GroupState state) throws Exception {
>> {
>>
>>
>>
>>   if (state.hasTimedOut()) {
>>
>> //
>>
>> }else{
>>
>> if (state.exists()) {
>> ProductStateInformation oldSession = state.get();
>> System.out.println("State for productId:"+productId + " with old values 
>> "+oldSession);
>>
>> }
>>
>>
>> public class EventsApp implements Serializable{
>>
>> public void run(String[] args) throws Exception {
>>
>> ...
>>
>>
>> Dataset dataSet = sparkSession
>> .readStream()
>> .format("kafka")
>> .option("kafka.bootstrap.servers", "localhost")
>> .option("startingOffsets","latest")
>> .option("failOnDataLoss", "false")
>> .option("subscribe", "topic1,topic2")
>> .option("includeTimestamp", true)
>>
>> .load();
>>
>>  eventsDS.groupByKey(
>> new MapFunction() {
>> @Override public String call(Event event) {
>> return event.getProductId();
>> }
>> }, Encoders.STRING())
>> .mapGroupsWithState(
>> new StateUpdateTask(3),
>> Encoders.bean(ProductSessionInformation.class),
>> Encoders.bean(ProductSessionUpdate.class),
>> GroupStateTimeout.ProcessingTimeTimeout());
>>
>> ...
>>
>>
>> StreamingQuery query = productUpdates
>> .writeStream()
>> .foreach(new ForeachWriter() {
>> @Override
>> public boolean open(long l, long l1) {return true;}
>>
>> @Override
>> public void process(ProductSessionUpdate productSessionUpdate) {
>> logger.info("-> query process: "+ productSessionUpdate);
>> }
>>
>> @Override
>> public void close(Throwable throwable) {}
>> })
>> .outputMode("update")
>> .option("checkpointLocation", checkpointDir)
>> .start();
>>
>> query.awaitTermination();
>>
>> }
>>
>> public class ProductStateInformation implements Serializable {
>>
>> protected String requestId;
>> protected String productId;
>> protected String priority;
>> protected long firstEventTimeMillis;
>> protected long lastEventTimeMillis;
>> protected long firstReceivedTimeMillis;
>> protected int numberOfEvents;
>>
>> ...//getter setters
>>
>> }
>>
>> These are are the versions I am using:
>>
>> 2.3.1
>> 2.4.3
>>
>> 2.6.60.10.2.0
>>
>> 3.0.3
>>
>>


Re: spark structured streaming GroupState returns weird values from sate

2020-03-28 Thread Jungtaek Lim
I have't heard known issue for this - that said, this may require new
investigation which is not possible or require huge effort without simple
reproducer.

Contributors (who are basically volunteers) may not want to struggle to
reproduce from your partial information - I'd recommend you to spend your
time to help volunteers starting from simple reproducer, if you are stuck
at it and have to resolve it.

Could you please get rid of the business logic which you may want to
redact, and provide full of source code which reproduces the bug?

On Sat, Mar 28, 2020 at 8:11 PM Srinivas V  wrote:

> Sorry for typos , correcting them below
>
> On Sat, Mar 28, 2020 at 4:39 PM Srinivas V  wrote:
>
>> Sorry I was just changing some names not to send exact names. Please
>> ignore that. I am really struggling with this since couple of days. Can
>> this happen due to
>> 1. some of the values being null or
>> 2.UTF8  issue ? Or some serilization/ deserilization issue ?
>> 3. Not enough memory ?
>> BTW, I am using same names in my code.
>>
>> On Sat, Mar 28, 2020 at 10:50 AM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>>
>>> Well, the code itself doesn't seem to be OK - you're using
>>> ProductStateInformation as the class of State whereas you provide
>>> ProductSessionInformation to Encoder for State.
>>>
>>> On Fri, Mar 27, 2020 at 11:14 PM Jungtaek Lim <
>>> kabhwan.opensou...@gmail.com> wrote:
>>>
>>>> Could you play with Encoders.bean()? You can Encoders.bean() with your
>>>> class, and call .schema() with the return value to see how it transforms to
>>>> the schema in Spark SQL. The schema must be consistent across multiple JVM
>>>> runs to make it work properly, but I suspect it doesn't retain the order.
>>>>
>>>> On Fri, Mar 27, 2020 at 10:28 PM Srinivas V 
>>>> wrote:
>>>>
>>>>> I am listening to Kafka topic with a structured streaming application
>>>>> with Java,  testing it on my local Mac.
>>>>> When I retrieve back GroupState object
>>>>> with state.get(), it is giving some random values for the fields in the
>>>>> object, some are interchanging some are default and some are junk values.
>>>>>
>>>>> See this example below:
>>>>> While setting I am setting:
>>>>> ProductSessionInformation{requestId='222112345',
>>>>> productId='222112345', priority='0', firstEventTimeMillis=1585312384,
>>>>> lastEventTimeMillis=1585312384, firstReceivedTimeMillis=1585312401693,
>>>>> numberOfEvents=1}
>>>>>
>>>>> When I retrieve it back, it comes like this:
>>>>> ProductSessionInformation{requestId='some junk characters are coming
>>>>> here' productId='222112345', priority='222112345',
>>>>> firstEventTimeMillis=1585312401693, lastEventTimeMillis=1,
>>>>> firstReceivedTimeMillis=1585312384, numberOfEvents=1}
>>>>>
>>>>> Any clue why it might be happening? I am stuck with this for couple of
>>>>> days. Immediate help is appreciated.
>>>>>
>>>>> code snippet:
>>>>>
>>>>>
>>>>> public class StateUpdateTask implements 
>>>>> MapGroupsWithStateFunction>>>> ProductSessionUpdate> {
>>>>>
>>>>>  @Override
>>>>> public ProductSessionUpdate call(String productId, Iterator 
>>>>> eventsIterator, GroupState state) throws 
>>>>> Exception {
>>>>> {
>>>>>
>>>>>
>>>>>
>>>>>   if (state.hasTimedOut()) {
>>>>>
>>>>> //
>>>>>
>>>>> }else{
>>>>>
>>>>> if (state.exists()) {
>>>>> ProductStateInformation oldSession = state.get();
>>>>> System.out.println("State for productId:"+productId + " with old 
>>>>> values "+oldSession);
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> public class EventsApp implements Serializable{
>>>>>
>>>>> public void run(String[] args) throws Exception {
>>>>>
>>>>> ...
>>>>>
>>>>>
>>>>> Dataset dataSet = sparkSession
>>>>> .readStream()
>>>>> .format("kafka")
>>>>> .option("kafka.bootstrap.s

Re: Spark Streaming Code

2020-03-28 Thread Jungtaek Lim
To get any meaningful answers you may want to provide the
information/context as much as possible. e.g. Spark version, which
behavior/output was expected (and why you think) and how it behaves
actually.

On Sun, Mar 29, 2020 at 3:37 AM Siva Samraj  wrote:

> Hi Team,
>
> Need help on windowing & watermark concept.  This code is not working as
> expected.
>
> package com.jiomoney.streaming
>
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.streaming.ProcessingTime
>
> object SlingStreaming {
>   def main(args: Array[String]): Unit = {
> val spark = SparkSession
>   .builder()
>   .master("local[*]")
>   .appName("Coupons_ViewingNow")
>   .getOrCreate()
>
> import spark.implicits._
>
> val checkpoint_path = "/opt/checkpoints/"
>
> val ks = spark
>   .readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "localhost:9092")
>   .option("subscribe", "test")
>   .option("startingOffsets", "latest")
>   .option("failOnDataLoss", "false")
>   .option("kafka.replica.fetch.max.bytes", "16777216")
>   .load()
>
> val dfDeviceid = ks
>   .withColumn("val", ($"value").cast("string"))
>   .withColumn("count1", get_json_object(($"val"), "$.a"))
>   .withColumn("deviceId", get_json_object(($"val"), "$.b"))
>   .withColumn("timestamp", current_timestamp())
>
>
> val final_ids = dfDeviceid
>   .withColumn("processing_time", current_timestamp())
>   .withWatermark("processing_time","1 minutes")
>   .groupBy(window($"processing_time", "10 seconds"), $"deviceId")
>   .agg(sum($"count1") as "total")
>
> val t = final_ids
>   .select(to_json(struct($"*")) as "value")
>   .writeStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "localhost:9092")
>   .option("topic", "sub_topic")
>   .option("checkpointLocation", checkpoint_path)
>   .outputMode("append")
>   .trigger(ProcessingTime("1 seconds"))
>   .start()
>
> t.awaitTermination()
>
>   }
>
> }
>
>
> Thanks
>
>


Re: spark structured streaming GroupState returns weird values from sate

2020-03-31 Thread Jungtaek Lim
That seems to come from the difference how Spark infers schema and create
serializer / deserializer for Java beans to construct bean encoder.

When inferring schema for Java beans, all properties which have getter
methods are considered. When creating serializer / deserializer, only
properties which have both getter and setter methods are considered. Here
Java Introspector is being used which doesn't seem to deal with fields
(it's the JDK feature hence I guess that's the right definition of Java
beans), so having getter methods which have no pair of setter methods might
lead some problems.

The code line has TODO, but the code is ancient (added in 2015) - I have no
context and I'm not 100% sure fixing it would be safer. Maybe it would be
great if you could provide the simple reproducer to play with, but given
you've fixed the issue...

On Tue, Mar 31, 2020 at 5:01 PM Srinivas V  wrote:

>
> Never mind. It got resolved after I removed extra two getter methods (to
> calculate duration) I created in my State specific Java bean
> (ProductSessionInformation). But I am surprised why it has created so much
> problem. I guess when this bean is converted to Scala class it may not be
> taking care of non getter methods of the fields defined? Still how is that
> causing the state object get corrupt so much?
>
>
> On Sat, Mar 28, 2020 at 7:46 PM Srinivas V  wrote:
>
>> Ok, I will try to create some simple code to reproduce, if I can. Problem
>> is that I am adding this code in an existing big project with several
>> dependencies with spark streaming older version(2.2) on root level etc.
>>
>> Also, I observed that there is @Experimental on GroupState class. What
>> state is it in now? Several people using this feature in prod?
>>
>>
>> On Sat, Mar 28, 2020 at 6:23 PM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>>
>>> I have't heard known issue for this - that said, this may require new
>>> investigation which is not possible or require huge effort without simple
>>> reproducer.
>>>
>>> Contributors (who are basically volunteers) may not want to struggle to
>>> reproduce from your partial information - I'd recommend you to spend your
>>> time to help volunteers starting from simple reproducer, if you are stuck
>>> at it and have to resolve it.
>>>
>>> Could you please get rid of the business logic which you may want to
>>> redact, and provide full of source code which reproduces the bug?
>>>
>>> On Sat, Mar 28, 2020 at 8:11 PM Srinivas V  wrote:
>>>
>>>> Sorry for typos , correcting them below
>>>>
>>>> On Sat, Mar 28, 2020 at 4:39 PM Srinivas V  wrote:
>>>>
>>>>> Sorry I was just changing some names not to send exact names. Please
>>>>> ignore that. I am really struggling with this since couple of days. Can
>>>>> this happen due to
>>>>> 1. some of the values being null or
>>>>> 2.UTF8  issue ? Or some serilization/ deserilization issue ?
>>>>> 3. Not enough memory ?
>>>>> BTW, I am using same names in my code.
>>>>>
>>>>> On Sat, Mar 28, 2020 at 10:50 AM Jungtaek Lim <
>>>>> kabhwan.opensou...@gmail.com> wrote:
>>>>>
>>>>>> Well, the code itself doesn't seem to be OK - you're using
>>>>>> ProductStateInformation as the class of State whereas you provide
>>>>>> ProductSessionInformation to Encoder for State.
>>>>>>
>>>>>> On Fri, Mar 27, 2020 at 11:14 PM Jungtaek Lim <
>>>>>> kabhwan.opensou...@gmail.com> wrote:
>>>>>>
>>>>>>> Could you play with Encoders.bean()? You can Encoders.bean() with
>>>>>>> your class, and call .schema() with the return value to see how it
>>>>>>> transforms to the schema in Spark SQL. The schema must be consistent 
>>>>>>> across
>>>>>>> multiple JVM runs to make it work properly, but I suspect it doesn't 
>>>>>>> retain
>>>>>>> the order.
>>>>>>>
>>>>>>> On Fri, Mar 27, 2020 at 10:28 PM Srinivas V 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I am listening to Kafka topic with a structured streaming
>>>>>>>> application with Java,  testing it on my local Mac.
>>>>>>>> When I retrieve back GroupState object
>>>>>>>> with state.get(), it is giving some random values for the fields in the
>

Re: Data Source - State (SPARK-28190)

2020-03-30 Thread Jungtaek Lim
Hi Bryan,

Thanks for the interest! Unfortunately there's lack of support on
committers for SPARK-28190 (I have been struggling with lack of support on
structured streaming contributions). I hope things will get better, but in
the meantime, could you please try out my own project instead?

https://github.com/HeartSaVioR/spark-state-tools

It's not super convenient to use as of now, as structured streaming doesn't
store schema for state. The schema should be provided manually, or from
actual query. The improvement is being proposed via SPARK-27237 but this is
also no activity right now due to lack of support as well.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Tue, Mar 31, 2020 at 4:50 AM Bryan Jeffrey 
wrote:

> Hi, Jungtaek.
>
> We've been investigating the use of Spark Structured Streaming to replace
> our Spark Streaming operations.  We have several cases where we're using
> mapWithState to maintain state across batches, often with high volumes of
> data.  We took a look at the Structured Streaming stateful processing.
> Structured Streaming state processing looks great, but has some
> shortcomings:
> 1. State can only be hydrated from checkpoint, which means that
> modification of the state is not possible.
> 2. You cannot cleanup or normalize state data after it has been processed.
>
> These shortcomings appear to be potentially addressed by your
> ticket SPARK-28190 - "Data Source - State".  I see little activity on this
> ticket. Can you help me to understand where this feature currently stands?
>
> Thank you,
>
> Bryan Jeffrey
>


Re: Spark Structure Streaming | FileStreamSourceLog not deleting list of input files | Spark -2.4.0

2020-04-21 Thread Jungtaek Lim
You're hitting an existing issue
https://issues.apache.org/jira/browse/SPARK-17604. While there's no active
PR to address it, I've been planning to take a look sooner than later.

Btw, you may also want to take a look at my previous mail - the topic on
the mail thread was regarding file stream sink metadata growing bigger, but
in fact that's basically the same issue, so you may get some information
from there. (tl;dr. I have bunch of PRs for addressing multiple issues on
file stream source and sink, just having lack of some love.)

https://lists.apache.org/thread.html/rb4ebf1d20d13db0a78694e8d301e51c326f803cb86fc1a1f66f2ae7e%40%3Cuser.spark.apache.org%3E

Thanks,
Jungtaek Lim (HeartSaVioR)

On Tue, Apr 21, 2020 at 8:23 PM Pappu Yadav  wrote:

> Hi Team,
>
> While Running Spark Below are some finding.
>
>1. FileStreamSourceLog is responsible for maintaining input source
>file list.
>2. Spark Streaming delete expired log files on the basis of s
>*park.sql.streaming.fileSource.log.deletion* and
>*spark.sql.streaming.minBatchesToRetain.*
>3. But while compacting logs Spark Streaming write the complete list
>of files streaming has seen till now in HDFS into one single .compact file.
>4. Over the course of time this compact file  is consuming around
>2GB-5GB in HDFS which will delay creation of compact file after every 10th
>Batch and also job restart time will increase.
>5. Why Spark Streaming is logging files in the system which are
>already deleted . While creating compact file there must be some configured
>timeout so that Spark can skip writing expired list of input files.
>
> *Also kindly let me know if i missed something and there is some
> configuration already present to handle this. *
>
> Regards
> Pappu Yadav
>


Re: is RosckDB backend available in 3.0 preview?

2020-04-22 Thread Jungtaek Lim
Sorry I should have been more clear.

The discussion went to the conclusion that RocksDB state store cannot be
included in Spark main codebase - it should start as individual project,
and can be adopted when the project is popular enough. (See PR for more
details.) That's why I guided to the implementation on Spark ecosystem.

On Thu, Apr 23, 2020 at 1:22 AM kant kodali  wrote:

> is it going to make it in 3.0?
>
> On Tue, Apr 21, 2020 at 9:24 PM Jungtaek Lim 
> wrote:
>
>> Unfortunately, the short answer is no. Please refer the last part of
>> discussion on the PR https://github.com/apache/spark/pull/24922
>>
>> Unless we get any native implementation of this, I guess this project is
>> most widely known implementation for RocksDB backend state store -
>> https://github.com/chermenin/spark-states
>>
>> On Wed, Apr 22, 2020 at 11:32 AM kant kodali  wrote:
>>
>>> Hi All,
>>>
>>> 1. is RosckDB backend available in 3.0 preview?
>>> 2. if RocksDB can store intermediate results of a stream-stream join can
>>> I run  streaming join queries forever? forever I mean until I run out of
>>> disk. or put it another can I run the stream-stream join queries for years
>>> if necessary (imagine I got lot of disk capacity but not a whole lot of
>>> RAM)?
>>> 3. Does is to do incremental checkpointing to HDFS?
>>>
>>> Thanks!
>>>
>>>


Re: Spark hangs while reading from jdbc - does nothing Removing Guess work from trouble shooting

2020-04-21 Thread Jungtaek Lim
If there's no third party libraries in the dump then why not share the
thread dump? (I mean, the output of jstack)

stack trace would be more helpful to find which thing acquired lock and
which other things are waiting for acquiring lock, if we suspect deadlock.

On Wed, Apr 22, 2020 at 2:38 AM Ruijing Li  wrote:

> After refreshing a couple of times, I notice the lock is being swapped
> between these 3. The other 2 will be blocked by whoever gets this lock, in
> a cycle of 160 has lock -> 161 -> 159 -> 160
>
> On Tue, Apr 21, 2020 at 10:33 AM Ruijing Li  wrote:
>
>> In thread dump, I do see this
>> - SparkUI-160- acceptor-id-ServerConnector@id(HTTP/1.1) | RUNNABLE |
>> Monitor
>> - SparkUI-161-acceptor-id-ServerConnector@id(HTTP/1.1) | BLOCKED |
>> Blocked by Thread(Some(160)) Lock
>> -  SparkUI-159-acceptor-id-ServerConnector@id(HTTP/1.1) | BLOCKED |
>> Blocked by Thread(Some(160)) Lock
>>
>> Could the fact that 160 has the monitor but is not running be causing a
>> deadlock preventing the job from finishing?
>>
>> I do see my Finalizer and main method are waiting. I don’t see any other
>> threads from 3rd party libraries or my code in the dump. I do see spark
>> context cleaner has timed waiting.
>>
>> Thanks
>>
>>
>> On Tue, Apr 21, 2020 at 9:58 AM Ruijing Li  wrote:
>>
>>> Strangely enough I found an old issue that is the exact same issue as
>>> mine
>>> https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-18343
>>>
>>> However I’m using spark 2.4.4 so the issue should have been solved by
>>> now.
>>>
>>> Like the user in the jira issue I am using mesos, but I am reading from
>>> oracle instead of writing to Cassandra and S3.
>>>
>>>
>>> On Thu, Apr 16, 2020 at 1:54 AM ZHANG Wei  wrote:
>>>
>>>> The Thread dump result table of Spark UI can provide some clues to find
>>>> out thread locks issue, such as:
>>>>
>>>>   Thread ID | Thread Name  | Thread State | Thread Locks
>>>>   13| NonBlockingInputStreamThread | WAITING  | Blocked by
>>>> Thread Some(48) Lock(jline.internal.NonBlockingInputStream@103008951})
>>>>   48| Thread-16| RUNNABLE |
>>>> Monitor(jline.internal.NonBlockingInputStream@103008951})
>>>>
>>>> And echo thread row can show the call stacks after being clicked, then
>>>> you can check the root cause of holding locks like this(Thread 48 of 
>>>> above):
>>>>
>>>>   org.fusesource.jansi.internal.Kernel32.ReadConsoleInputW(Native
>>>> Method)
>>>>
>>>> org.fusesource.jansi.internal.Kernel32.readConsoleInputHelper(Kernel32.java:811)
>>>>
>>>> org.fusesource.jansi.internal.Kernel32.readConsoleKeyInput(Kernel32.java:842)
>>>>
>>>> org.fusesource.jansi.internal.WindowsSupport.readConsoleInput(WindowsSupport.java:97)
>>>>   jline.WindowsTerminal.readConsoleInput(WindowsTerminal.java:222)
>>>>   
>>>>
>>>> Hope it can help you.
>>>>
>>>> --
>>>> Cheers,
>>>> -z
>>>>
>>>> On Thu, 16 Apr 2020 16:36:42 +0900
>>>> Jungtaek Lim  wrote:
>>>>
>>>> > Do thread dump continuously, per specific period (like 1s) and see the
>>>> > change of stack / lock for each thread. (This is not easy to be done
>>>> in UI
>>>> > so maybe doing manually would be the only option. Not sure Spark UI
>>>> will
>>>> > provide the same, haven't used at all.)
>>>> >
>>>> > It will tell which thread is being blocked (even it's shown as
>>>> running) and
>>>> > which point to look at.
>>>> >
>>>> > On Thu, Apr 16, 2020 at 4:29 PM Ruijing Li 
>>>> wrote:
>>>> >
>>>> > > Once I do. thread dump, what should I be looking for to tell where
>>>> it is
>>>> > > hanging? Seeing a lot of timed_waiting and waiting on driver.
>>>> Driver is
>>>> > > also being blocked by spark UI. If there are no tasks, is there a
>>>> point to
>>>> > > do thread dump of executors?
>>>> > >
>>>> > > On Tue, Apr 14, 2020 at 4:49 AM Gabor Somogyi <
>>>> gabor.g.somo...@gmail.com>
>>>> > > wrote:
>>>> > >
>>

Re: Spark hangs while reading from jdbc - does nothing Removing Guess work from trouble shooting

2020-04-21 Thread Jungtaek Lim
No, that's not a thing to apologize for. It's just your call - less context
would bring less reaction and interest.

On Wed, Apr 22, 2020 at 11:50 AM Ruijing Li  wrote:

> I apologize, but I cannot share it, even if it is just typical spark
> libraries. I definitely understand that limits debugging help, but wanted
> to understand if anyone has encountered a similar issue.
>
> On Tue, Apr 21, 2020 at 7:12 PM Jungtaek Lim 
> wrote:
>
>> If there's no third party libraries in the dump then why not share the
>> thread dump? (I mean, the output of jstack)
>>
>> stack trace would be more helpful to find which thing acquired lock and
>> which other things are waiting for acquiring lock, if we suspect deadlock.
>>
>> On Wed, Apr 22, 2020 at 2:38 AM Ruijing Li  wrote:
>>
>>> After refreshing a couple of times, I notice the lock is being swapped
>>> between these 3. The other 2 will be blocked by whoever gets this lock, in
>>> a cycle of 160 has lock -> 161 -> 159 -> 160
>>>
>>> On Tue, Apr 21, 2020 at 10:33 AM Ruijing Li 
>>> wrote:
>>>
>>>> In thread dump, I do see this
>>>> - SparkUI-160- acceptor-id-ServerConnector@id(HTTP/1.1) | RUNNABLE |
>>>> Monitor
>>>> - SparkUI-161-acceptor-id-ServerConnector@id(HTTP/1.1) | BLOCKED |
>>>> Blocked by Thread(Some(160)) Lock
>>>> -  SparkUI-159-acceptor-id-ServerConnector@id(HTTP/1.1) | BLOCKED |
>>>> Blocked by Thread(Some(160)) Lock
>>>>
>>>> Could the fact that 160 has the monitor but is not running be causing a
>>>> deadlock preventing the job from finishing?
>>>>
>>>> I do see my Finalizer and main method are waiting. I don’t see any
>>>> other threads from 3rd party libraries or my code in the dump. I do see
>>>> spark context cleaner has timed waiting.
>>>>
>>>> Thanks
>>>>
>>>>
>>>> On Tue, Apr 21, 2020 at 9:58 AM Ruijing Li 
>>>> wrote:
>>>>
>>>>> Strangely enough I found an old issue that is the exact same issue as
>>>>> mine
>>>>> https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-18343
>>>>>
>>>>> However I’m using spark 2.4.4 so the issue should have been solved by
>>>>> now.
>>>>>
>>>>> Like the user in the jira issue I am using mesos, but I am reading
>>>>> from oracle instead of writing to Cassandra and S3.
>>>>>
>>>>>
>>>>> On Thu, Apr 16, 2020 at 1:54 AM ZHANG Wei  wrote:
>>>>>
>>>>>> The Thread dump result table of Spark UI can provide some clues to
>>>>>> find out thread locks issue, such as:
>>>>>>
>>>>>>   Thread ID | Thread Name  | Thread State | Thread
>>>>>> Locks
>>>>>>   13| NonBlockingInputStreamThread | WAITING  | Blocked
>>>>>> by Thread Some(48) Lock(jline.internal.NonBlockingInputStream@103008951
>>>>>> })
>>>>>>   48| Thread-16| RUNNABLE |
>>>>>> Monitor(jline.internal.NonBlockingInputStream@103008951})
>>>>>>
>>>>>> And echo thread row can show the call stacks after being clicked,
>>>>>> then you can check the root cause of holding locks like this(Thread 48 of
>>>>>> above):
>>>>>>
>>>>>>   org.fusesource.jansi.internal.Kernel32.ReadConsoleInputW(Native
>>>>>> Method)
>>>>>>
>>>>>> org.fusesource.jansi.internal.Kernel32.readConsoleInputHelper(Kernel32.java:811)
>>>>>>
>>>>>> org.fusesource.jansi.internal.Kernel32.readConsoleKeyInput(Kernel32.java:842)
>>>>>>
>>>>>> org.fusesource.jansi.internal.WindowsSupport.readConsoleInput(WindowsSupport.java:97)
>>>>>>   jline.WindowsTerminal.readConsoleInput(WindowsTerminal.java:222)
>>>>>>   
>>>>>>
>>>>>> Hope it can help you.
>>>>>>
>>>>>> --
>>>>>> Cheers,
>>>>>> -z
>>>>>>
>>>>>> On Thu, 16 Apr 2020 16:36:42 +0900
>>>>>> Jungtaek Lim  wrote:
>>>>>>
>>>>>> > Do thread dump continuously, per specific period (like 1s) and see
>>>>>> the
>>>>>> > change of stack /

Re: is RosckDB backend available in 3.0 preview?

2020-04-21 Thread Jungtaek Lim
Unfortunately, the short answer is no. Please refer the last part of
discussion on the PR https://github.com/apache/spark/pull/24922

Unless we get any native implementation of this, I guess this project is
most widely known implementation for RocksDB backend state store -
https://github.com/chermenin/spark-states

On Wed, Apr 22, 2020 at 11:32 AM kant kodali  wrote:

> Hi All,
>
> 1. is RosckDB backend available in 3.0 preview?
> 2. if RocksDB can store intermediate results of a stream-stream join can I
> run  streaming join queries forever? forever I mean until I run out of
> disk. or put it another can I run the stream-stream join queries for years
> if necessary (imagine I got lot of disk capacity but not a whole lot of
> RAM)?
> 3. Does is to do incremental checkpointing to HDFS?
>
> Thanks!
>
>


Re: [spark streaming] checkpoint location feature for batch processing

2020-05-03 Thread Jungtaek Lim
Replied inline:

On Sun, May 3, 2020 at 6:25 PM Magnus Nilsson  wrote:

> Thank you, so that would mean spark gets the current latest offset(s) when
> the trigger fires and then process all available messages in the topic upto
> and including that offset as long as maxOffsetsPerTrigger is the default of
> None (or large enought to handle all available messages).
>

Yes it starts from the offset of latest batch. `maxOffsetsPerTrigger` will
be ignored starting from Spark 3.0.0, which means for Spark 2.x it's still
affecting even Trigger.Once is used I guess.


>
> I think the word micro-batch confused me (more like mega-batch in some
> cases). It makes sense though, this makes Trigger.Once a fixed interval
> trigger that's only fired once and not repeatedly.
>

"micro" is relative - though Spark by default processes all available
inputs per batch, in most cases you'll want to make the batch size
(interval) as small as possible, as it defines the latency of the output.
Trigger.Once is an unusual case in streaming workload - that's more alike
continuous execution of "batch". I refer "continuous" as picking up latest
context which is the characteristic of streaming query, hence hybrid one.


>
>
> On Sun, May 3, 2020 at 3:20 AM Jungtaek Lim 
> wrote:
>
>> If I understand correctly, Trigger.once executes only one micro-batch and
>> terminates, that's all. Your understanding of structured streaming applies
>> there as well.
>>
>> It's like a hybrid approach as bringing incremental processing from
>> micro-batch but having processing interval as batch. That said, while it
>> enables to get both sides of benefits, it's basically structured streaming,
>> inheriting all the limitations on the structured streaming, compared to the
>> batch query.
>>
>> Spark 3.0.0 will bring some change on Trigger.once (SPARK-30669 [1]) -
>> Trigger.once will "ignore" the read limit per micro-batch on data source
>> (like maxOffsetsPerTrigger) and process all available input as possible.
>> (Data sources should migrate to the new API to take effect, but works for
>> built-in data sources like file and Kafka.)
>>
>> 1. https://issues.apache.org/jira/browse/SPARK-30669
>>
>> 2020년 5월 2일 (토) 오후 5:35, Magnus Nilsson 님이 작성:
>>
>>> I've always had a question about Trigger.Once that I never got around to
>>> ask or test for myself. If you have a 24/7 stream to a Kafka topic.
>>>
>>> Will Trigger.Once get the last offset(s) when it starts and then quit
>>> once it hits this offset(s) or will the job run until no new messages is
>>> added to the topic for a particular amount of time?
>>>
>>> br,
>>>
>>> Magnus
>>>
>>> On Sat, May 2, 2020 at 1:22 AM Burak Yavuz  wrote:
>>>
>>>> Hi Rishi,
>>>>
>>>> That is exactly why Trigger.Once was created for Structured Streaming.
>>>> The way we look at streaming is that it doesn't have to be always real
>>>> time, or 24-7 always on. We see streaming as a workflow that you have to
>>>> repeat indefinitely. See this blog post for more details!
>>>>
>>>> https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html
>>>>
>>>> Best,
>>>> Burak
>>>>
>>>> On Fri, May 1, 2020 at 2:55 PM Rishi Shah 
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I recently started playing with spark streaming, and checkpoint
>>>>> location feature looks very promising. I wonder if anyone has an opinion
>>>>> about using spark streaming with checkpoint location option as a slow 
>>>>> batch
>>>>> processing solution. What would be the pros and cons of utilizing 
>>>>> streaming
>>>>> with checkpoint location feature to achieve fault tolerance in batch
>>>>> processing application?
>>>>>
>>>>> --
>>>>> Regards,
>>>>>
>>>>> Rishi Shah
>>>>>
>>>>


Re: [spark streaming] checkpoint location feature for batch processing

2020-05-02 Thread Jungtaek Lim
If I understand correctly, Trigger.once executes only one micro-batch and
terminates, that's all. Your understanding of structured streaming applies
there as well.

It's like a hybrid approach as bringing incremental processing from
micro-batch but having processing interval as batch. That said, while it
enables to get both sides of benefits, it's basically structured streaming,
inheriting all the limitations on the structured streaming, compared to the
batch query.

Spark 3.0.0 will bring some change on Trigger.once (SPARK-30669 [1]) -
Trigger.once will "ignore" the read limit per micro-batch on data source
(like maxOffsetsPerTrigger) and process all available input as possible.
(Data sources should migrate to the new API to take effect, but works for
built-in data sources like file and Kafka.)

1. https://issues.apache.org/jira/browse/SPARK-30669

2020년 5월 2일 (토) 오후 5:35, Magnus Nilsson 님이 작성:

> I've always had a question about Trigger.Once that I never got around to
> ask or test for myself. If you have a 24/7 stream to a Kafka topic.
>
> Will Trigger.Once get the last offset(s) when it starts and then quit once
> it hits this offset(s) or will the job run until no new messages is added
> to the topic for a particular amount of time?
>
> br,
>
> Magnus
>
> On Sat, May 2, 2020 at 1:22 AM Burak Yavuz  wrote:
>
>> Hi Rishi,
>>
>> That is exactly why Trigger.Once was created for Structured Streaming.
>> The way we look at streaming is that it doesn't have to be always real
>> time, or 24-7 always on. We see streaming as a workflow that you have to
>> repeat indefinitely. See this blog post for more details!
>>
>> https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html
>>
>> Best,
>> Burak
>>
>> On Fri, May 1, 2020 at 2:55 PM Rishi Shah 
>> wrote:
>>
>>> Hi All,
>>>
>>> I recently started playing with spark streaming, and checkpoint location
>>> feature looks very promising. I wonder if anyone has an opinion about using
>>> spark streaming with checkpoint location option as a slow batch processing
>>> solution. What would be the pros and cons of utilizing streaming with
>>> checkpoint location feature to achieve fault tolerance in batch processing
>>> application?
>>>
>>> --
>>> Regards,
>>>
>>> Rishi Shah
>>>
>>


Re: [Structured Streaming] NullPointerException in long running query

2020-04-28 Thread Jungtaek Lim
The root cause of exception is occurred in executor side "Lost task 10.3 in
stage 1.0 (TID 81, spark6, executor 1)" so you may need to check there.

On Tue, Apr 28, 2020 at 2:52 PM lec ssmi  wrote:

> Hi:
>   One of my long-running queries occasionally encountered the following
> exception:
>
>
>   Caused by: org.apache.spark.SparkException: Job aborted due to stage
>> failure: Task 10 in stage 1.0 failed 4 times, most recent failure: Lost
>> task 10.3 in stage 1.0 (TID 81, spark6, executor 1):
>> java.lang.NullPointerException
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
>> at scala.Option.foreach(Option.scala:257)
>> at
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:929)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:927)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
>> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:927)
>> at
>> org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:49)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:475)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:473)
>> at
>> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
>> at
>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>> at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org
>> $apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:472)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
>> at
>> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
>> at
>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
>> at
>> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
>> at org.apache.spark.sql.execution.streaming.StreamExecution.org
>> $apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
>> ... 1 more
>
>
>
> According to the exception stack, it seems to have 

Re: Pyspark Kafka Structured Stream not working.

2020-05-06 Thread Jungtaek Lim
Hi,

1. You seem to use DStream (Spark Streaming), not Structured Streaming.
2. I'm not familiar with pyspark, but looks like the error message is very
clear - Kafka doesn't allow such name for "client.id". The error message
guides the naming rule, so you may need to be adopted with such convention.
(e.g. no space)

Hope this helps,

Thanks,
Jungtaek Lim (HeartSaVioR)

On Wed, May 6, 2020 at 5:36 PM Vijayant Kumar
 wrote:

> Hi All,
>
>
>
> I am getting the below error while using Pyspark Structured Streaming from
> Kafka Producer.
>
>
>
> 20/05/06 11:51:16 ERROR ReceiverTracker: Deregistered receiver for stream
> 0: Error starting receiver 0 - kafka.common.InvalidConfigException:
> client.id Python Kafka streamer is illegal, contains a character other
> than ASCII alphanumerics, '.', '_' and '-'
>
>
>
> I am using the below code to get the messages:
>
>
>
> broker='vm105:2181'
>
> topic='Hello-Kafka'
>
> print 'broker topic is ',broker,topic
>
> kvs = KafkaUtils.createStream(ssc, \
>
>   broker, \
>
>   "Python Kafka streamer",{topic:1})
>
>
>
> And my Submit command is like below :-
>
> *spark-submit --jars spark-streaming-kafka-0-8-assembly_2.11-2.4.4.jar
> test_kafka.py vm105:2181 Hello-Kafka*
>
>
>
> Can any one help me what am I missing. ?
>
>
>
> Thanks,
>
> Vijayant
> --
>
> This e-mail message may contain confidential or proprietary information of
> Mavenir Systems, Inc. or its affiliates and is intended solely for the use
> of the intended recipient(s). If you are not the intended recipient of this
> message, you are hereby notified that any review, use or distribution of
> this information is absolutely prohibited and we request that you delete
> all copies in your control and contact us by e-mailing to
> secur...@mavenir.com. This message contains the views of its author and
> may not necessarily reflect the views of Mavenir Systems, Inc. or its
> affiliates, who employ systems to monitor email messages, but make no
> representation that such messages are authorized, secure, uncompromised, or
> free from computer viruses, malware, or other defects. Thank You
>


Re: No. of active states?

2020-05-07 Thread Jungtaek Lim
Have you looked through and see metrics for state operators?

It has been providing "total rows" of state, and starting from Spark 2.4 it
also provides additional metrics specific to HDFSBackedStateStoreProvider,
including estimated memory usage in overall.

https://github.com/apache/spark/blob/24fac1e0c70a783b4d240607639ff20d7dd24191/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L168-L179


On Fri, May 8, 2020 at 11:30 AM Something Something <
mailinglist...@gmail.com> wrote:

> No. We are already capturing these metrics (e.g. numInputRows,
> inputRowsPerSecond).
>
> I am talking about "No. of States" in the memory at any given time.
>
> On Thu, May 7, 2020 at 4:31 PM Jungtaek Lim 
> wrote:
>
>> If you're referring total "entries" in all states in SS job, it's being
>> provided via StreamingQueryListener.
>>
>>
>> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#monitoring-streaming-queries
>>
>> Hope this helps.
>>
>> On Fri, May 8, 2020 at 3:26 AM Something Something <
>> mailinglist...@gmail.com> wrote:
>>
>>> Is there a way to get the total no. of active states in memory at any
>>> given point in a Stateful Spark Structured Streaming job? We are thinking
>>> of using this metric for 'Auto Scaling' our Spark cluster.
>>>
>>


Re: [E] Re: Pyspark Kafka Structured Stream not working.

2020-05-07 Thread Jungtaek Lim
It's not either 1 or 2. Both two items are applied. I haven't played with
DStream + pyspark but given the error message is clear you'll probably want
to change the client.id "Python Kafka streamer" to accommodate the naming
convention guided in error message.

On Thu, May 7, 2020 at 3:55 PM Vijayant Kumar 
wrote:

> Hi Jungtek,
>
>
>
> Thanks for the response. It appears to be #1.
>
> I will appreciate if you can share some sample command to submit the Spark
> application.?
>
>
>
> *From:* Jungtaek Lim [mailto:kabhwan.opensou...@gmail.com]
> *Sent:* Wednesday, May 06, 2020 8:24 PM
> *To:* Vijayant Kumar 
> *Cc:* user@spark.apache.org
> *Subject:* [E] Re: Pyspark Kafka Structured Stream not working.
>
>
>
> *[EXTERNAL EMAIL]* DO NOT CLICK links or attachments unless you recognize
> the sender and know the content is safe.
>
> Hi,
>
>
>
> 1. You seem to use DStream (Spark Streaming), not Structured Streaming.
>
> 2. I'm not familiar with pyspark, but looks like the error message is very
> clear - Kafka doesn't allow such name for "client.id". The error message
> guides the naming rule, so you may need to be adopted with such convention.
> (e.g. no space)
>
>
>
> Hope this helps,
>
>
>
> Thanks,
>
> Jungtaek Lim (HeartSaVioR)
>
>
>
> On Wed, May 6, 2020 at 5:36 PM Vijayant Kumar <
> vijayant.ku...@mavenir.com.invalid> wrote:
>
> Hi All,
>
>
>
> I am getting the below error while using Pyspark Structured Streaming from
> Kafka Producer.
>
>
>
> 20/05/06 11:51:16 ERROR ReceiverTracker: Deregistered receiver for stream
> 0: Error starting receiver 0 - kafka.common.InvalidConfigException:
> client.id Python Kafka streamer is illegal, contains a character other
> than ASCII alphanumerics, '.', '_' and '-'
>
>
>
> I am using the below code to get the messages:
>
>
>
> broker='vm105:2181'
>
> topic='Hello-Kafka'
>
> print 'broker topic is ',broker,topic
>
> kvs = KafkaUtils.createStream(ssc, \
>
>   broker, \
>
>   "Python Kafka streamer",{topic:1})
>
>
>
> And my Submit command is like below :-
>
> *spark-submit --jars spark-streaming-kafka-0-8-assembly_2.11-2.4.4.jar
> test_kafka.py vm105:2181 Hello-Kafka*
>
>
>
> Can any one help me what am I missing. ?
>
>
>
> Thanks,
>
> Vijayant
> --
>
> This e-mail message may contain confidential or proprietary information of
> Mavenir Systems, Inc. or its affiliates and is intended solely for the use
> of the intended recipient(s). If you are not the intended recipient of this
> message, you are hereby notified that any review, use or distribution of
> this information is absolutely prohibited and we request that you delete
> all copies in your control and contact us by e-mailing to
> secur...@mavenir.com. This message contains the views of its author and
> may not necessarily reflect the views of Mavenir Systems, Inc. or its
> affiliates, who employ systems to monitor email messages, but make no
> representation that such messages are authorized, secure, uncompromised, or
> free from computer viruses, malware, or other defects. Thank You
>
> --
>
> This e-mail message may contain confidential or proprietary information of
> Mavenir Systems, Inc. or its affiliates and is intended solely for the use
> of the intended recipient(s). If you are not the intended recipient of this
> message, you are hereby notified that any review, use or distribution of
> this information is absolutely prohibited and we request that you delete
> all copies in your control and contact us by e-mailing to
> secur...@mavenir.com. This message contains the views of its author and
> may not necessarily reflect the views of Mavenir Systems, Inc. or its
> affiliates, who employ systems to monitor email messages, but make no
> representation that such messages are authorized, secure, uncompromised, or
> free from computer viruses, malware, or other defects. Thank You
>


Re: No. of active states?

2020-05-07 Thread Jungtaek Lim
If you're referring total "entries" in all states in SS job, it's being
provided via StreamingQueryListener.

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#monitoring-streaming-queries

Hope this helps.

On Fri, May 8, 2020 at 3:26 AM Something Something 
wrote:

> Is there a way to get the total no. of active states in memory at any
> given point in a Stateful Spark Structured Streaming job? We are thinking
> of using this metric for 'Auto Scaling' our Spark cluster.
>


Re: [structured streaming] [stateful] Null value appeared in non-nullable field

2020-05-23 Thread Jungtaek Lim
Hi,

Only with stack trace there’s nothing to look into it. It’d be better to
provide simple reproducer (code, and problematic inputs) so that someone
may give it a try.

You may also want to give it a try with 3.0.0, RC2 is better to test
against, but preview2 would be easier for end users to test.

2020년 5월 23일 (토) 오후 8:14, Srinivas V 님이 작성:

> Hello,
>  I am listening to a kaka topic through Spark Structured Streaming
> [2.4.5]. After processing messages for few mins, I am getting below
> NullPointerException.I have three beans used here 1.Event 2.StateInfo
> 3.SessionUpdateInfo. I am suspecting that the problem is with StateInfo,
> when it is writing state to hdfs it might be failing or it could be failing
> while I update accumulators. But why would it fail for some events but not
> for others? Once it fails, it stops the Streaming query.
> When I send all fields null except EevntId in my testing, it works fine.
> Any idea what could be happening?
> Attaching the full stack trace as well.
> This is a - yarn cluster, saving state in HDFS.
>
> Exception:
>
> 20/05/23 09:46:46 ERROR 
> org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask: Aborting 
> commit for partition 42 (task 118121, attempt 9, stage 824.0)
> 20/05/23 09:46:46 ERROR 
> org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask: Aborted 
> commit for partition 42 (task 118121, attempt 9, stage 824.0)
> 20/05/23 09:46:46 ERROR org.apache.spark.executor.Executor: Exception in task 
> 42.9 in stage 824.0 (TID 118121)
> java.lang.NullPointerException: Null value appeared in non-nullable field:
> top level input bean
> If the schema is inferred from a Scala tuple/case class, or a Java bean, 
> please try to use scala.Option[_] or other nullable types (e.g. 
> java.lang.Integer instead of int/scala.Int).
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.serializefromobject_doConsume_0$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
>   at 
> org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
>   at 
> org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
>   at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
>   at 
> org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
>   at 
> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
>   at 
> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>   at org.apache.spark.scheduler.Task.run(Task.scala:123)
>   at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> 20/05/23 09:47:48 ERROR 
> org.apache.spark.executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
>
>
>
> Regards
>
> Srini V
>
>


Re: Elastic Search sink showing -1 for numOutputRows

2020-09-07 Thread Jungtaek Lim
I don't know about ES sink. The availability of "numOutputRows" depends on
the API version the sink is implementing (DSv1 vs DSv2), so you may be
better to ask a question to the author of ES sink and confirm the case.

On Tue, Sep 8, 2020 at 5:15 AM jainshasha  wrote:

> Hi,
>
> Using structured spark streaming and sink the data into ElasticSearch.
> In the stats emit for each batch the "numOutputRows" showing -1 for
> ElasticSearch sink always
> whereas when i see other sinks like Kafka it shows either 0 or some values
> when it emit data.
>
> What could be the reason for showing -1 for ElasticSearch ?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Keeping track of how long something has been in a queue

2020-09-07 Thread Jungtaek Lim
You may want to google around "session window" and "duration", and check
whether the concept fits your requirements. Probably adding some custom
logic on top of the session window would work for you, which requires you
to implement a custom function for flatMapGroupsWithState.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Fri, Sep 4, 2020 at 11:21 PM Hamish Whittal 
wrote:

> Sorry, I moved a paragraph,
>
> (2) If Ms green.th was first seen at 13:04:04, then at 13:04:05 and
>> finally at 13:04:17, she's been in the queue for 13 seconds (ignoring the
>> ms).
>>
>


Re: Excessive disk IO with Spark structured streaming

2020-10-05 Thread Jungtaek Lim
batch interval and
checkpoint interval to provide flexibility, say, I can tolerate
reprocessing up to 10 mins of data being processed when fail occurs, but
due to the output latency I should have micro-batch interval as 30 seconds.
(In other words, do a checkpoint per around 20 micro-batches.) That is a
bit tricky to implement actually, and also I don't see any request for this
so that is just a sketched idea.

And the same also for the files in Spark local directory?
>
> * * *
>
> The numbers for checkpoint directory are, of course, captured when it was
> set to a local drive (or Lustre/NFS.).
>
> For HDFS there are obviously no local file system calls for the checkpoint
> store, as HDFS does not present itself as an OS-level file system.
> Nevertheless the name of checkpoint directory was transmitted over HDFS
> connection socket 1,675 times per microbatch, so the number of high-level
> HDFS file operations must have been at least that high.
>
> * * *
>
> On a related note, for 920,000 events Spark made 700,000 attempts to
> execute chmod or readlink program, i.e. to launch an external subprocess
> with an executable in order to perform a file operation. Those 900,000
> attempts actually represent 150,000 cycles, and in each cycle Spark tried
> to launch the program from 6 different locations (/usr/local/sbin ->
> /usr/local/bin -> /usr/sbin -> /usr/bin -> /sbin -> /bin),  until it
> finally finds it in the last. But then on the next cycle Spark/Hadoop does
> not re-use the knowledge of a previously found utility location, and
> repeats the search from the very start causing useless file system search
> operations over and over again.
>
> This may or may not matter when HDFS is used for checkpoint store
> (depending on how HDFS server implements the calls), but it does matter
> when a file system like Lustre or NFS is used for checkpoint storage.
> (Not to mention spawning readlink and chmod does not seem like a bright
> idea in the first place, although perhaps there might be a reason why
> Hadoop layer does it this way).
>
> Thanks,
> Sergey
>
> On Mon, Oct 5, 2020 at 5:45 AM Jungtaek Lim 
> wrote:
>
>> First of all, you'd want to divide these numbers by the number of
>> micro-batches, as file creations in checkpoint directory would occur
>> similarly per micro-batch.
>> Second, you'd want to dive inside the checkpoint directory and have
>> separate numbers per top-subdirectory.
>>
>> After that we can see whether the value would make sense or not.
>>
>> Regarding file I/O issues on SS, two issues I know about are:
>> 1) If you use streaming aggregation, it unnecessarily creates a temporary
>> file for both read and write on the state store, while the file is only
>> needed for writing. That makes the number of file creations to be 2x. The
>> patch is proposed under SPARK-30294 [1].
>>
>> 2) Spark leverages HDFS API which is configured to create crc file per
>> file by default. (So you'll have 2x files than expected.) There's a bug in
>> HDFS API (HADOOP-16255 [2]) which missed to handle crc files during rename
>> (in short of how checkpoint works in SS, temp file is atomically renamed to
>> be the final file), and as a workaround (SPARK-28025 [3]) Spark tries to
>> delete the crc file which two additional operations (exist -> delete) may
>> occur per crc file.
>>
>> Hope this helps.
>>
>> Thanks,
>> Jungtaek Lim (HeartSaVioR)
>>
>> 1. https://issues.apache.org/jira/browse/SPARK-30294
>> 2. https://issues.apache.org/jira/browse/HADOOP-16255
>> 3. https://issues.apache.org/jira/browse/SPARK-28025
>>
>>
>> On Sun, Oct 4, 2020 at 10:08 PM Sergey Oboguev  wrote:
>>
>>> I am trying to run a Spark structured streaming program simulating basic
>>> scenario of ingesting events and calculating aggregates on a window with
>>> watermark, and I am observing an inordinate amount of disk IO Spark
>>> performs.
>>>
>>> The basic structure of the program is like this:
>>>
>>> sparkSession = SparkSession.builder()
>>>.appName()
>>>.master("local[*]")
>>>.config("spark.executor.memory", "8g")
>>>.config("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer")
>>>.config("spark.kryoserializer.buffer", "8m")
>>>.config("spark.local.dir", ...local
>>> directory...)
>>

Re: [Spark Core] Why no spark.read.delta / df.write.delta?

2020-10-05 Thread Jungtaek Lim
Sure. My point was that Delta Lake is also one of the 3rd party libraries
and there's no way for Apache Spark to do that. There's a Delta Lake's own
group and the request is better to be there.

On Mon, Oct 5, 2020 at 9:54 PM Enrico Minack  wrote:

> Though spark.read. refers to "built-in" data sources, there is
> nothing that prevents 3rd party libraries to "extend" spark.read in Scala
> or Python. As users know the Spark-way to read built-in data sources, it
> feels natural to hook 3rd party data sources under the same scheme, to give
> users a holistic and integrated feel.
>
> One Scala example (
> https://github.com/G-Research/spark-dgraph-connector#spark-dgraph-connector
> ):
>
> import uk.co.gresearch.spark.dgraph.connector._val triples = 
> spark.read.dgraph.triples("localhost:9080")
>
> and in Python:
>
> from gresearch.spark.dgraph.connector import *triples = 
> spark.read.dgraph.triples("localhost:9080")
>
> I agree that 3rd parties should also support the official
> spark.read.format() and the new catalog approaches.
>
> Enrico
>
> Am 05.10.20 um 14:03 schrieb Jungtaek Lim:
>
> Hi,
>
> "spark.read." is a "shorthand" for "built-in" data sources, not
> for external data sources. spark.read.format() is still an official way to
> use it. Delta Lake is not included in Apache Spark so that is indeed not
> possible for Spark to refer to.
>
> Starting from Spark 3.0, the concept of "catalog" is introduced, which you
> can simply refer to the table from catalog (if the external data source
> provides catalog implementation) and no need to specify the format
> explicitly (as catalog would know about it).
>
> This session explains the catalog and how Cassandra connector leverages
> it. I see some external data sources starting to support catalog, and in
> Spark itself there's some effort to support catalog for JDBC.
>
> https://databricks.com/fr/session_na20/datasource-v2-and-cassandra-a-whole-new-world
>
> Hope this helps.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
>
> On Mon, Oct 5, 2020 at 8:53 PM Moser, Michael <
> michael.mo...@siemens-healthineers.com> wrote:
>
>> Hi there,
>>
>>
>>
>> I’m just wondering if there is any incentive to implement read/write
>> methods in the DataFrameReader/DataFrameWriter for delta similar to e.g.
>> parquet?
>>
>>
>>
>> For example, using PySpark, “spark.read.parquet” is available, but
>> “spark.read.delta” is not (same for write).
>>
>> In my opinion, “spark.read.delta” feels more clean and pythonic compared
>> to “spark.read.format(‘delta’).load()”, especially if more options are
>> called, like “mode”.
>>
>>
>>
>> Can anyone explain the reasoning behind this, is this due to the Java
>> nature of Spark?
>>
>> From a pythonic point of view, I could also imagine a single read/write
>> method, with the format as an arg and kwargs related to the different file
>> format options.
>>
>>
>>
>> Best,
>>
>> Michael
>>
>>
>>
>>
>>
>


Re: Excessive disk IO with Spark structured streaming

2020-10-07 Thread Jungtaek Lim
I can't spend too much time on explaining one by one. I strongly encourage
you to do a deep-dive instead of just looking around as you want to know
about "details" - that's how open source works.

I'll go through a general explanation instead of replying inline; probably
I'd write a blog doc if there's no existing doc (I guess there should be
one) instead of putting too much time here.

In short, the reason Spark has to create these files "per micro-batch" is
to ensure fault-tolerance. For example, If the query fails at batch 5 and
you rerun the query, it should rerun batch 5. How?

Spark should be aware the offsets the query has been read for batch 4,
preferably the offsets the query read for batch 5. They're offsets/commits.
State is for storing accumulated values on stateful operations. Same here -
Spark should be able to read the state for batch 4 so that it can calculate
the new accumulated values for batch 5. In addition, partition means max
parallelism (they aren't aware of each other and they shouldn't), hence the
state for partition should be stored individually.

Storing 4 files (in the end we'll only have "2" files, but here I count
temp files with crc files, as we are talking about performance aspect) per
partition per micro-batch is the thing I already explained - I agree it's
not ideal, e.g. I submitted the PR for SPARK-30294 [1] which reduces the
number of files by half. Probably we could propose Hadoop to skip creating
CRC files (I'm not sure it can be simply done as of now), but Spark
is conservative about upgrading the versions for dependencies so it might
not be available soon even if we address it right away.

As you've found here it's super important to find the right value of
shuffle partitions. It's partitioned by hash function, so it strongly
depends on the group key. If the cardinality of group key is low, probably
the right value of shuffle partitions should be fairly small. Unfortunately
once the query runs you can't change the value of shuffle partitions, as
Spark doesn't have the feature of state migration once the number of
partitions change. Either you need to predict the overall cardinality at
specific time and set the right value, or try to use a 3rd party state
tool. [2] (DISCLAIMER: I'm the author.)

1. https://issues.apache.org/jira/browse/SPARK-30294
2. https://github.com/HeartSaVioR/spark-state-tools


On Wed, Oct 7, 2020 at 11:16 PM Sergey Oboguev  wrote:

> Hi Jungtaek,
>
> *> I meant the subdirectory inside the directory you're providing as
> "checkpointLocation", as there're several directories in that directory...*
>
> There are two:
>
> *my-spark-checkpoint-dir/MainApp*
> created by sparkSession.sparkContext().setCheckpointDir( for the app>)
> contains only empty subdir with GUID name
>
> *my-spark-checkpoint-dir/writer*
> created by ds.writeStream().option("checkpointLocation",  for writer>)
> contains all the files
>
> Within the latter ("writer") there are four subdirectories: commits,
> metadata, offsets, state.
>
> Breakdown of file creations within them, per 69 microbatches (when shuffle
> partition count = 200) is:
>
> commits = 136
> metadata = 0
> offsets = 138
> state = 56232
>
> (Creation is identified by strace record for "openat" system call with
> O_CREAT flag and file path in the corresponding directory.)
>
> When shuffle partition count is 10, breakdown of file creations within
> them, per 69 microbatches, is:
>
> commits = 136
> metadata = 0
> offsets = 138
> state = 2760
>
> *> The size of the delta file heavily depends on your stateful operation
> and data in each micro-batch. delta file only captures the "changes" of
> state in specific micro-batch, so there're cases you'll have very tiny
> delta files, e.g. cardinality of grouped key is small (hence cardinality of
> KVs is also small), small amount of inputs are provided per micro-batch,
> the overall size of aggregated row is small, there's skew on grouped key
> (hence some partitions get no input or small inputs), etc.*
>
>
> In my case there is no key in the Row object (unless the bucketized
> "timestamp" for 2-min windows buckets becomes a key), and the microbatch is
> large enough: the whole problem is that Spark does not want to save the
> microbatch as a single file. Even after I reduce the number of shuffle
> partitions (see below), the number of files per microbatch remains
> significantly larger than the number of shuffle partitions.
>
> ..
>
> When the number of shuffle partitions is 200, Spark creates 816 files (per
> microbatch) in checkpoint store and 202 files in Spark local-dir.
>
> Of checkpoint files: 24 per microbatch are snapshot files, and 788 are
> delta files.
> The same per microbatch per partition: 0.12 snapshot files, 4 delta files.
> Of local-dir files: 200 temp_shuffle files per microbatch (as expected)
> and 2 other files (shuffle.data+shuffle.index).
>
> If I reduce the number of shuffle partitions, two things happen:
> - Throughput of a single pipeline 

Re: [Spark Core] Why no spark.read.delta / df.write.delta?

2020-10-05 Thread Jungtaek Lim
Hi,

"spark.read." is a "shorthand" for "built-in" data sources, not for
external data sources. spark.read.format() is still an official way to use
it. Delta Lake is not included in Apache Spark so that is indeed not
possible for Spark to refer to.

Starting from Spark 3.0, the concept of "catalog" is introduced, which you
can simply refer to the table from catalog (if the external data source
provides catalog implementation) and no need to specify the format
explicitly (as catalog would know about it).

This session explains the catalog and how Cassandra connector leverages it.
I see some external data sources starting to support catalog, and in Spark
itself there's some effort to support catalog for JDBC.
https://databricks.com/fr/session_na20/datasource-v2-and-cassandra-a-whole-new-world

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)


On Mon, Oct 5, 2020 at 8:53 PM Moser, Michael <
michael.mo...@siemens-healthineers.com> wrote:

> Hi there,
>
>
>
> I’m just wondering if there is any incentive to implement read/write
> methods in the DataFrameReader/DataFrameWriter for delta similar to e.g.
> parquet?
>
>
>
> For example, using PySpark, “spark.read.parquet” is available, but
> “spark.read.delta” is not (same for write).
>
> In my opinion, “spark.read.delta” feels more clean and pythonic compared
> to “spark.read.format(‘delta’).load()”, especially if more options are
> called, like “mode”.
>
>
>
> Can anyone explain the reasoning behind this, is this due to the Java
> nature of Spark?
>
> From a pythonic point of view, I could also imagine a single read/write
> method, with the format as an arg and kwargs related to the different file
> format options.
>
>
>
> Best,
>
> Michael
>
>
>
>
>


Re: Excessive disk IO with Spark structured streaming

2020-10-05 Thread Jungtaek Lim
First of all, you'd want to divide these numbers by the number of
micro-batches, as file creations in checkpoint directory would occur
similarly per micro-batch.
Second, you'd want to dive inside the checkpoint directory and have
separate numbers per top-subdirectory.

After that we can see whether the value would make sense or not.

Regarding file I/O issues on SS, two issues I know about are:
1) If you use streaming aggregation, it unnecessarily creates a temporary
file for both read and write on the state store, while the file is only
needed for writing. That makes the number of file creations to be 2x. The
patch is proposed under SPARK-30294 [1].

2) Spark leverages HDFS API which is configured to create crc file per file
by default. (So you'll have 2x files than expected.) There's a bug in HDFS
API (HADOOP-16255 [2]) which missed to handle crc files during rename (in
short of how checkpoint works in SS, temp file is atomically renamed to be
the final file), and as a workaround (SPARK-28025 [3]) Spark tries to
delete the crc file which two additional operations (exist -> delete) may
occur per crc file.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)

1. https://issues.apache.org/jira/browse/SPARK-30294
2. https://issues.apache.org/jira/browse/HADOOP-16255
3. https://issues.apache.org/jira/browse/SPARK-28025


On Sun, Oct 4, 2020 at 10:08 PM Sergey Oboguev  wrote:

> I am trying to run a Spark structured streaming program simulating basic
> scenario of ingesting events and calculating aggregates on a window with
> watermark, and I am observing an inordinate amount of disk IO Spark
> performs.
>
> The basic structure of the program is like this:
>
> sparkSession = SparkSession.builder()
>.appName()
>.master("local[*]")
>.config("spark.executor.memory", "8g")
>.config("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>.config("spark.kryoserializer.buffer", "8m")
>.config("spark.local.dir", ...local
> directory...)
>.getOrCreate();
>
> sparkSession.sparkContext().setCheckpointDir(... checkpoint dir for the
> app ...);
>
> dataset = sparkSession.readStream()
>   .option("checkpointLocation", ... checkpoint dir for
> source ...)
>   .format(MockStreamingSource.class.getName())
>   .load();
>
> Dataset ds = dataset
>   .withWatermark("timestamp", "10 minutes")
>   .groupBy(
>   functions.window(functions.col("timestamp"),
> "2 minutes"),
>   functions.col("source"))
>   .agg(
>   functions.avg("D0").as("AVG_D0"),
>   functions.avg("I0").as("AVG_I0"));
>
> DataStreamWriter dsw = ds.writeStream()
>   // .trigger(Trigger.ProcessingTime("1
> minute"))
>   .option("checkpointLocation", .. checkpoint
> dir for writer ... );
>
> dsw.outputMode(OutputMode.Append())
>.format("console")
>.option("truncate", "false")
>.option("numRows", Integer.MAX_VALUE)
>.start()
>.awaitTermination();
>
>
> MockStreamingSource is just that -- a source intended to provide a
> simulated input. It generates microbatches of mock events and sends them to
> the app. In the testing scenario, the source simulates 20,000 devices each
> sending an event every 15 seconds for 11.5 minutes of logical time (just
> under 12 minutes of window size + watermark), for a total number of 920,000
> events.
>
> I initially started with microbatch sized to 500 events, and processing
> performance was totally dismal because of disk IO. I then increased
> microbatch size and performance got better, but still very poor. Microbatch
> size now is 13,334 events per batch, this corresponds to ingestion interval
> of 10 seconds. Smaller batches resulted in worse performance.
>
> But even with microbatch sized 13,334 event performance is poor because of
> excessive disk IO generated by Spark.
> Just ingesting data generated intra-app takes the program physical time
> equal to 40% of window size + watermark.
>
> Using strace, I measured that checkpoint directory for the stream writer
> receives the following number of Linux system calls:
>
> create/open file = 60,500 calls
> mkdir = 57,000

Re: Arbitrary stateful aggregation: updating state without setting timeout

2020-10-05 Thread Jungtaek Lim
Hi,

That's not explained in the SS guide doc but explained in the scala API doc.
http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/streaming/GroupState.html

The statement being quoted from the scala API doc answers your question.

The timeout is reset every time the function is called on a group, that is,
> when the group has new data, or the group has timed out. So the user has to
> set the timeout duration every time the function is called, otherwise there
> will not be any timeout set.


Simply saying, you'd want to always set timeout unless you remove state for
the group (key).

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)

‪On Mon, Oct 5, 2020 at 6:16 PM ‫Yuri Oleynikov (יורי אולייניקוב‬‎ <
yur...@gmail.com> wrote:‬

> Hi all, I have following question:
>
> What happens to the state (in terms of expiration) if I’m updating the
> state without setting timeout?
>
>
> E.g. in FlatMapGroupsWithStateFunction
>
>1. first batch:
>
> state.update(myObj)
>
> state.setTimeoutDuration(timeout)
>
>1. second batch:
>
> state.update(myObj)
>
>1. third batch (no data for a long time):
>   1.  state timed-out after initial timeout  expired? Not
>   timed-out?
>
>


Re: [Spark Kafka Structured Streaming] Adding partition and topic to the kafka dynamically

2020-08-28 Thread Jungtaek Lim
Hi Amit,

if I remember correctly, you don't need to restart the query to reflect the
newly added topic and partition, if your subscription covers the topic
(like subscribe pattern). Please try it out.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Fri, Aug 28, 2020 at 1:56 PM Amit Joshi 
wrote:

> Any pointers will be appreciated.
>
> On Thursday, August 27, 2020, Amit Joshi 
> wrote:
>
>> Hi All,
>>
>> I am trying to understand the effect of adding topics and partitions to a
>> topic in kafka, which is being consumed by spark structured streaming
>> applications.
>>
>> Do we have to restart the spark structured streaming application to read
>> from the newly added topic?
>> Do we have to restart the spark structured streaming application to read
>> from the newly added partition to a topic?
>>
>> Kafka consumers have a meta data refresh property that works without
>> restarting.
>>
>> Thanks advance.
>>
>> Regards
>> Amit Joshi
>>
>


Re: Structured Streaming metric for count of delayed/late data

2020-08-22 Thread Jungtaek Lim
I proposed another approach which provided accurate count, though the
number doesn't always mean they're dropped. (
https://github.com/apache/spark/pull/24936 for details)

Btw, the limitation only applies to streaming aggregation, so you can
implement the aggregation by yourself via (flat)MapGroupsWithState - note
that the local aggregation is "optimization", so you may need to account
the performance impact.

On Sat, Aug 22, 2020 at 1:29 PM GOEL Rajat 
wrote:

> Thanks for pointing me to the Spark ticket and its limitations. Will try
> these changes.
>
> Is there any workaround for this limitation of inaccurate count, maybe by
> adding some additional streaming operation in SS job without impacting perf
> too much ?
>
>
>
> Regards,
>
> Rajat
>
>
>
> *From: *Jungtaek Lim 
> *Date: *Friday, 21 August 2020 at 12:07 PM
> *To: *Yuanjian Li 
> *Cc: *GOEL Rajat , "user@spark.apache.org" <
> user@spark.apache.org>
> *Subject: *Re: Structured Streaming metric for count of delayed/late data
>
>
>
> One more thing to say, unfortunately, the number is not accurate compared
> to the input rows on streaming aggregation, because Spark does
> local-aggregate and counts dropped inputs based on "pre-locally-aggregated"
> rows. You may want to treat the number as whether dropping inputs is
> happening or not.
>
>
>
> On Fri, Aug 21, 2020 at 3:31 PM Yuanjian Li 
> wrote:
>
> The metrics have been added in
> https://issues.apache.org/jira/browse/SPARK-24634, but the target version
> is 3.1.
>
> Maybe you can backport for testing since it's not a big change.
>
>
>
> Best,
>
> Yuanjian
>
>
>
> GOEL Rajat  于2020年8月20日周四 下午9:14写道:
>
> Hi All,
>
>
>
> I have a query if someone can please help. Is there any metric or
> mechanism of printing count of input records dropped due to watermarking
> (late data count) in a stream, during a window based aggregation, in
> Structured Streaming ? I am using Spark 3.0.
>
>
>
> Thanks & Regards,
>
> Rajat
>
>


Re: Structured Streaming metric for count of delayed/late data

2020-08-21 Thread Jungtaek Lim
One more thing to say, unfortunately, the number is not accurate compared
to the input rows on streaming aggregation, because Spark does
local-aggregate and counts dropped inputs based on "pre-locally-aggregated"
rows. You may want to treat the number as whether dropping inputs is
happening or not.

On Fri, Aug 21, 2020 at 3:31 PM Yuanjian Li  wrote:

> The metrics have been added in
> https://issues.apache.org/jira/browse/SPARK-24634, but the target version
> is 3.1.
> Maybe you can backport for testing since it's not a big change.
>
> Best,
> Yuanjian
>
> GOEL Rajat  于2020年8月20日周四 下午9:14写道:
>
>> Hi All,
>>
>>
>>
>> I have a query if someone can please help. Is there any metric or
>> mechanism of printing count of input records dropped due to watermarking
>> (late data count) in a stream, during a window based aggregation, in
>> Structured Streaming ? I am using Spark 3.0.
>>
>>
>>
>> Thanks & Regards,
>>
>> Rajat
>>
>


Re: Query around Spark Checkpoints

2020-09-27 Thread Jungtaek Lim
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala

You would need to implement CheckpointFileManager by yourself, which is
tightly integrated with HDFS (parameters and return types of methods are
mostly from HDFS). That wouldn't mean it's impossible to
implement CheckpointFileManager against a non-filesystem, but it'd be
non-trivial to override all of the functionalities and make it work
seamlessly.

Required consistency is documented via javadoc of CheckpointFileManager -
please go through reading it, and evaluate whether your target storage can
fulfill the requirement.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Mon, Sep 28, 2020 at 3:04 AM Amit Joshi 
wrote:

> Hi,
>
> As far as I know, it depends on whether you are using spark streaming or
> structured streaming.
> In spark streaming you can write your own code to checkpoint.
> But in case of structured streaming it should be file location.
> But main question in why do you want to checkpoint in
> Nosql, as it's eventual consistence.
>
>
> Regards
> Amit
>
> On Sunday, September 27, 2020, Debabrata Ghosh 
> wrote:
>
>> Hi,
>> I had a query around Spark checkpoints - Can I store the checkpoints
>> in NoSQL or Kafka instead of Filesystem ?
>>
>> Regards,
>>
>> Debu
>>
>


Re: Query around Spark Checkpoints

2020-09-29 Thread Jungtaek Lim
Sorry I have no idea on Delta Lake. You may get a better answer from Delta
Lake mailing list.

One thing is clear that stateful processing is simply an essential feature
on almost every streaming framework. If you're struggling with something
around the state feature and trying to find a workaround then probably
something is going wrong. Please feel free to share it.

Thanks,
Jungtaek Lim (HeartSaVioR)

2020년 9월 30일 (수) 오전 1:14, Bryan Jeffrey 님이 작성:

> Jungtaek,
>
> How would you contrast stateful streaming with checkpoint vs. the idea of
> writing updates to a Delta Lake table, and then using the Delta Lake table
> as a streaming source for our state stream?
>
> Thank you,
>
> Bryan
>
> On Mon, Sep 28, 2020 at 9:50 AM Debabrata Ghosh 
> wrote:
>
>> Thank You Jungtaek and Amit ! This is very helpful indeed !
>>
>> Cheers,
>>
>> Debu
>>
>> On Mon, Sep 28, 2020 at 5:33 AM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>>
>>>
>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
>>>
>>> You would need to implement CheckpointFileManager by yourself, which is
>>> tightly integrated with HDFS (parameters and return types of methods are
>>> mostly from HDFS). That wouldn't mean it's impossible to
>>> implement CheckpointFileManager against a non-filesystem, but it'd be
>>> non-trivial to override all of the functionalities and make it work
>>> seamlessly.
>>>
>>> Required consistency is documented via javadoc of CheckpointFileManager
>>> - please go through reading it, and evaluate whether your target storage
>>> can fulfill the requirement.
>>>
>>> Thanks,
>>> Jungtaek Lim (HeartSaVioR)
>>>
>>> On Mon, Sep 28, 2020 at 3:04 AM Amit Joshi 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> As far as I know, it depends on whether you are using spark streaming
>>>> or structured streaming.
>>>> In spark streaming you can write your own code to checkpoint.
>>>> But in case of structured streaming it should be file location.
>>>> But main question in why do you want to checkpoint in
>>>> Nosql, as it's eventual consistence.
>>>>
>>>>
>>>> Regards
>>>> Amit
>>>>
>>>> On Sunday, September 27, 2020, Debabrata Ghosh 
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> I had a query around Spark checkpoints - Can I store the
>>>>> checkpoints in NoSQL or Kafka instead of Filesystem ?
>>>>>
>>>>> Regards,
>>>>>
>>>>> Debu
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>
>>
>
>


Re: RecordTooLargeException in Spark *Structured* Streaming

2020-05-25 Thread Jungtaek Lim
Hi,

You need to add the prefix "kafka." for the configurations which should be
propagated to the Kafka. Others will be used in Spark data source
itself. (Kafka connector in this case)

https://spark.apache.org/docs/2.4.5/structured-streaming-kafka-integration.html#kafka-specific-configurations

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)


On Tue, May 26, 2020 at 6:42 AM Something Something <
mailinglist...@gmail.com> wrote:

> I keep getting this error message:
>
>
> *The message is 1169350 bytes when serialized which is larger than the
> maximum request size you have configured with the max.request.size
> configuration.*
>
>
>
> As indicated in other posts, I am trying to set the “max.request.size”
> configuration in the Producer as follows:
>
>
> -
>
> .writeStream
>
> .format(*"kafka"*)
>
> .option(
>
>   *"kafka.bootstrap.servers"*,
>
>   conig.outputBootstrapServer
>
> )
>
> .option(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, *"1000"*)
>
> -
>
>
>
> But this is not working. Am I setting this correctly? Is there a different
> way to set this property under Spark Structured Streaming?
>
>
> Please help. Thanks.
>
>
>


Re: States get dropped in Structured Streaming

2020-10-23 Thread Jungtaek Lim
Unfortunately your information wouldn't provide any hint that rows in the
state are evicted correctly on watermark advance or there's an unknown bug
which some of the rows in state are silently dropped. I haven't heard of
the case for the latter - probably you'd like to double check it with
focusing on watermark advance. If the case is turned out to be the latter,
you'll probably need to deal with Spark code to inject the debug log.

On Fri, Oct 23, 2020 at 3:12 PM Eric Beabes 
wrote:

> We're using Stateful Structured Streaming in Spark 2.4. We are noticing
> that when the load on the system is heavy & LOTs of messages are coming in
> some of the states disappear with no error message. Any suggestions on how
> we can debug this? Any tips for fixing this?
>
> Thanks in advance.
>


Re: OOM while processing read/write to S3 using Spark Structured Streaming

2020-07-19 Thread Jungtaek Lim
Please provide logs and dump file for the OOM case - otherwise no one could
say what's the cause.

Add JVM options to driver/executor => -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath="...dir..."

On Sun, Jul 19, 2020 at 6:56 PM Rachana Srivastava
 wrote:

> *Issue:* I am trying to process 5000+ files of gzipped json file
> periodically from S3 using Structured Streaming code.
>
> *Here are the key steps:*
>
>1.
>
>Read json schema and broadccast to executors
>2.
>
>Read Stream
>
>Dataset inputDS = sparkSession.readStream() .format("text")
>.option("inferSchema", "true") .option("header", "true")
>.option("multiLine", true).schema(jsonSchema) .option("mode", "PERMISSIVE")
>.json(inputPath + "/*");
>3.
>
>Process each file in a map Dataset ds = inputDS.map(x -> { ... },
>Encoders.STRING());
>4.
>
>Write output to S3
>
>StreamingQuery query = ds .coalesce(1) .writeStream()
>.outputMode("append") .format("csv") ... .start();
>
> *maxFilesPerTrigger* is set to 500 so I was hoping the streaming will
> pick only that many file to process. Why are we getting OOM? If in a we
> have more than 3500 files then system crashes with OOM.
>
>


Re: Lazy Spark Structured Streaming

2020-08-02 Thread Jungtaek Lim
SPARK-24156 runs the no-data batch to apply the updated watermark, but the
updated watermark may not be eligible to evict all state rows. (e.g.
window, lateness of watermark)
You'll still need to provide dummy input record to advance watermark, so
that all expected state rows can be evicted.

On Sun, Aug 2, 2020 at 5:44 PM Phillip Henry 
wrote:

> Thanks, Jungtaek. Very useful information.
>
> Could I please trouble you with one further question - what you said makes
> perfect sense but to what exactly does SPARK-24156
> <https://issues.apache.org/jira/browse/SPARK-24156> refer if not fixing
> the "need to add a dummy record to move watermark forward"?
>
> Kind regards,
>
> Phillip
>
>
>
>
> On Mon, Jul 27, 2020 at 11:41 PM Jungtaek Lim <
> kabhwan.opensou...@gmail.com> wrote:
>
>> I'm not sure what exactly your problem is, but given you've mentioned
>> window and OutputMode.Append, you may want to remind that append mode
>> doesn't produce the output of aggregation unless the watermark "passes by".
>> It's expected behavior if you're seeing lazy outputs on OutputMode.Append
>> compared to OutputMode.Update.
>>
>> Unfortunately there's no mechanism on SSS to move forward only watermark
>> without actual input, so if you want to test some behavior on
>> OutputMode.Append you would need to add a dummy record to move watermark
>> forward.
>>
>> Hope this helps.
>>
>> Thanks,
>> Jungtaek Lim (HeartSaVioR)
>>
>> On Mon, Jul 27, 2020 at 8:10 PM Phillip Henry 
>> wrote:
>>
>>> Sorry, should have mentioned that Spark only seems reluctant to take the
>>> last windowed, groupBy batch from Kafka when using OutputMode.Append.
>>>
>>> I've asked on StackOverflow:
>>>
>>> https://stackoverflow.com/questions/62915922/spark-structured-streaming-wont-pull-the-final-batch-from-kafka
>>> but am still struggling. Can anybody please help?
>>>
>>> How do people test their SSS code if you have to put a message on Kafka
>>> to get Spark to consume a batch?
>>>
>>> Kind regards,
>>>
>>> Phillip
>>>
>>>
>>> On Sun, Jul 12, 2020 at 4:55 PM Phillip Henry 
>>> wrote:
>>>
>>>> Hi, folks.
>>>>
>>>> I noticed that SSS won't process a waiting batch if there are no
>>>> batches after that. To put it another way, Spark must always leave one
>>>> batch on Kafka waiting to be consumed.
>>>>
>>>> There is a JIRA for this at:
>>>>
>>>> https://issues.apache.org/jira/browse/SPARK-24156
>>>>
>>>> that says it's resolved in 2.4.0 but my code
>>>> <https://github.com/PhillHenry/SSSPlayground/blob/Spark2/src/test/scala/uk/co/odinconsultants/sssplayground/windows/TimestampedStreamingSpec.scala>
>>>> is using 2.4.2 yet I still see Spark reluctant to consume another batch
>>>> from Kafka if it means there is nothing else waiting to be processed in the
>>>> topic.
>>>>
>>>> Do I have to do something special to exploit the behaviour that
>>>> SPARK-24156 says it has addressed?
>>>>
>>>> Regards,
>>>>
>>>> Phillip
>>>>
>>>>
>>>>
>>>>


Re: [SPARK-STRUCTURED-STREAMING] IllegalStateException: Race while writing batch 4

2020-08-12 Thread Jungtaek Lim
File stream sink doesn't support the functionality. There're several
approaches to do so:

1) two queries write to Kafka (or any intermediate storage which allows
concurrent writes), and let next Spark application read and write to the
final path
2) two queries write to two different directories, and let next Spark
application read and write to the final path
3) use alternative data sources which enable concurrent writes on writing
files (you may want to check Delta Lake, Apache Hudi, Apache Iceberg for
such functionalities - though you'd probably need to learn many other
things to maintain the table in good shape)

Thanks,
Jungtaek Lim (HeartSaVioR)

On Sat, Aug 8, 2020 at 4:19 AM Amit Joshi  wrote:

> Hi,
>
> I have 2spark structure streaming queries writing to the same outpath in
> object storage.
> Once in a while I am getting the "IllegalStateException: Race while
> writing batch 4".
> I found that this error is because there are two writers writing to the
> output path. The file streaming sink doesn't support multiple writers.
> It assumes there is only one writer writing to the path. Each query needs
> to use its own output directory.
>
> Is there a way to write the output to the same path by both queries, as I
> need the output at the same path.?
>
> Regards
> Amit Joshi
>


Re: Pyspark: Issue using sql in foreachBatch sink

2020-07-31 Thread Jungtaek Lim
Python doesn't allow abbreviating () with no param, whereas Scala does. Use
`write()`, not `write`.

On Wed, Jul 29, 2020 at 9:09 AM muru  wrote:

> In a pyspark SS job, trying to use sql instead of sql functions in
> foreachBatch sink
> throws AttributeError: 'JavaMember' object has no attribute 'format'
> exception.
> However, the same thing works in Scala API.
>
> Please note, I tested in spark 2.4.5/2.4.6 and 3.0.0 and got the same
> exception.
> Is it a bug or known issue with Pyspark implementation? I noticed that I
> could perform other operations except the write method.
>
> Please, let me know how to fix this issue.
>
> See below code examples
> # Spark Scala method
> def processData(batchDF: DataFrame, batchId: Long) {
>batchDF.createOrReplaceTempView("tbl")
>val outdf=batchDF.sparkSession.sql("select action, count(*) as count
> from tbl where date='2020-06-20' group by 1")
>outdf.printSchema()
>outdf.show
>outdf.coalesce(1).write.format("csv").save("/tmp/agg")
> }
>
> ## pyspark python method
> def process_data(bdf, bid):
>   lspark = bdf._jdf.sparkSession()
>   bdf.createOrReplaceTempView("tbl")
>   outdf=lspark.sql("select action, count(*) as count from tbl where
> date='2020-06-20' group by 1")
>   outdf.printSchema()
>   # it works
>   outdf.show()
>   # throws AttributeError: 'JavaMember' object has no attribute 'format'
> exception
>   outdf.coalesce(1).write.format("csv").save("/tmp/agg1")
>
> Here is the full exception
> 20/07/24 16:31:24 ERROR streaming.MicroBatchExecution: Query [id =
> 854a39d0-b944-4b52-bf05-cacf998e2cbd, runId =
> e3d4dc7d-80e1-4164-8310-805d7713fc96] terminated with error
> py4j.Py4JException: An exception was raised by the Python Proxy. Return
> Message: Traceback (most recent call last):
>   File
> "/Users/muru/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
> line 2381, in _call_proxy
> return_value = getattr(self.pool[obj_id], method)(*params)
>   File "/Users/muru/spark/python/pyspark/sql/utils.py", line 191, in call
> raise e
> AttributeError: 'JavaMember' object has no attribute 'format'
> at py4j.Protocol.getReturnValue(Protocol.java:473)
> at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
> at com.sun.proxy.$Proxy20.call(Unknown Source)
> at
> org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$$anonfun$callForeachBatch$1.apply(ForeachBatchSink.scala:55)
> at
> org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$$anonfun$callForeachBatch$1.apply(ForeachBatchSink.scala:55)
> at
> org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:537)
> at
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org
> $apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
> at
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
> at org.apache.spark.sql.execution.streaming.StreamExecution.org
> $apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)

Re: Spark structured streaming -Kafka - deployment / monitor and restart

2020-07-05 Thread Jungtaek Lim
There're sections in SS programming guide which exactly answer these
questions:

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#monitoring-streaming-queries

Also, for Kafka data source, there's a 3rd party project (DISCLAIMER: I'm
the author) to help you commit the offset to Kafka with the specific group
ID.

https://github.com/HeartSaVioR/spark-sql-kafka-offset-committer

After then, you can also leverage the Kafka ecosystem to monitor the
progress in point of Kafka's view, especially the gap between highest
offset and committed offset.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)


On Mon, Jul 6, 2020 at 2:53 AM Gabor Somogyi 
wrote:

> In 3.0 the community just added it.
>
> On Sun, 5 Jul 2020, 14:28 KhajaAsmath Mohammed, 
> wrote:
>
>> Hi,
>>
>> We are trying to move our existing code from spark dstreams to structured
>> streaming for one of the old application which we built few years ago.
>>
>> Structured streaming job doesn’t have streaming tab in sparkui. Is there
>> a way to monitor the job submitted by us in structured streaming ? Since
>> the job runs for every trigger, how can we kill the job and restart if
>> needed.
>>
>> Any suggestions on this please
>>
>> Thanks,
>> Asmath
>>
>>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: Failure Threshold in Spark Structured Streaming?

2020-07-02 Thread Jungtaek Lim
Structured Streaming is basically following SQL semantic, which doesn't
have such a semantic of "max allowance of failures". If you'd like to
tolerate malformed data, please read with raw format (string or binary)
which won't fail with such data, and try converting. e.g. from_json() will
produce null if the data is malformed, so you can filter out later easily.


On Fri, Jul 3, 2020 at 1:24 AM Eric Beabes  wrote:

> Currently my job fails even on a single failure. In other words, even if
> one incoming message is malformed the job fails. I believe there's a
> property that allows us to set an acceptable number of failures. I Googled
> but couldn't find the answer. Can someone please help? Thanks.
>
>


  1   2   >