re: should we dump a warning if we drop batches due to window move?

2018-08-03 Thread Peter Liu
Hello there,

I have a quick question for the following case:

situation:
a spark consumer is able to process 5 batches in 10 sec (where the batch
interval is zero by default - correct me if this is wrong). the window size
is 10 sec (zero overlapping sliding).
there are some fluctuations in the incoming message arriving rate,
resulting a slightly higher incoming message rate than the consumer is able
to handle, say,  sometimes 6 batch worth of data comes in 10 sec for 5
minutes ...

question:
would we (spark 2.2) drop the 6th. batch when the 10 sec window moves on -
Or unprocessed batches keeps accumulate?
if we drop, would we dump a warning in the log?

i can see warning (see attached below) when the batch processing takes more
time than an explicitly set batch interval (which is not the case here). i
would expect a similar warning in the log (can't find this type of warning)
when we have to drop batches in the above case.

maybe i was just looking for wrong text in the log? in general, is the
expectation reasonable? (I can't find anything here:
https://spark.apache.org/docs/2.2.1/streaming-programming-guide.html and
from general googling)

any comments/suggestions would be very much appreciated!

Thanks,

Peter

18/08/03 00:33:43 WARN streaming.ProcessingTimeExecutor: Current batch is
falling behind. The trigger interval is 1 milliseconds, but spent 11965
milliseconds
18/08/03 00:33:55 WARN streaming.ProcessingTimeExecutor: Current batch is
falling behind. The trigger interval is 1 milliseconds, but spent 11266
milliseconds


re: overcommit: cpus / vcores

2018-10-15 Thread Peter Liu
Hi there,

I have a system with 80 vcores and a relatively light spark streaming
workload. Overcomming the vcore resource (i.e. > 80) in the config (see (a)
below) seems to help to improve the average spark batch time (see (b)
below).

Is there any best practice guideline on resource overcommit with cpu /
vcores, such as yarn config options, candidate cases ideal for
overcommiting vcores etc.?

the slide below (from 2016 though) seems to address the memory overcommit
topic and hint a "future" topic on cpu overcommit:
https://www.slideshare.net/HadoopSummit/investing-the-effects-of-overcommitting-yarn-resources


Would like to know if this is a reasonable config practice and why this is
not achievable without overcommit. Any help/hint would be very much
appreciated!

Thanks!

Peter

(a) yarn-site.xml

yarn.nodemanager.resource.cpu-vcores
110



yarn.scheduler.maximum-allocation-vcores
110



(b)
FYI:
I have a system with 80 vcores and a relatively light spark streaming
workload. overcomming the vocore resource (here 100) seems to help the
average spark batch time. need more understanding on this practice.
Skylake (1 x 900K msg/sec) total batch# (avg) avg batch time in ms (avg) avg
user cpu (%) nw read (mb/sec)
70vocres 178.20 8154.69 n/a n/a
80vocres 177.40 7865.44 27.85 222.31
100vcores 177.00 7,209.37 30.02 220.86


Re: overcommit: cpus / vcores

2018-10-15 Thread Peter Liu
 Hi Khaled,

I have attached the spark streaming config below in (a).
In case of the 100vcore run (see the initial email), I used 50 executors
where each executor has 2 vcores and 3g memory. For 70 vcore case, 35
executors, for 80 vcore case, 40 executors.
In the yarn config (yarn-site.xml, (b) below), the  available vcores set
over 80 (I call it "overcommit").

Not sure if there is a more proper way to do this (overcommit) and what
would be the best practice in this type of situation (say, light cpu
workload in a dedicated yarn cluster) to increase the cpu utilization for a
better performance.

Any help would be very much appreciated.

Thanks ...

Peter

(a)

   val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaCluster.kafkaNodesString)
  .option("startingOffsets", "latest")
  .option("subscribe", Variables.EVENTS_TOPIC)
  .option("kafkaConsumer.pollTimeoutMs", "5000")
  .load()
  .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS
TIMESTAMP)").as[(String, Timestamp)]
  .select(from_json($"value", mySchema).as("data"), $"timestamp")
  .select("data.*", "timestamp")
  .where($"event_type" === "view")
  .select($"ad_id", $"event_time")
  .join(campaigns.toSeq.toDS().cache(), Seq("ad_id"))
  .groupBy(millisTime(window($"event_time", "10
seconds").getField("start")) as 'time_window, $"campaign_id")
  .agg(count("*") as 'count, max('event_time) as 'lastUpdate)
  .select(to_json(struct("*")) as 'value)
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaCluster.kafkaNodesString)
//original
  .option("topic", Variables.OUTPUT_TOPIC)
  .option("checkpointLocation", s"/tmp/${java.util.UUID.randomUUID()}")
//TBD: ram disk?
  .outputMode("update")
  .start()

(b)

yarn.nodemanager.resource.cpu-vcores
110


yarn.scheduler.maximum-allocation-vcores
110


On Mon, Oct 15, 2018 at 4:26 PM Khaled Zaouk  wrote:

> Hi Peter,
>
> What parameters are you putting in your spark streaming configuration?
> What are you putting as number of executor instances and how many cores per
> executor are you setting in your Spark job?
>
> Best,
>
> Khaled
>
> On Mon, Oct 15, 2018 at 9:18 PM Peter Liu  wrote:
>
>> Hi there,
>>
>> I have a system with 80 vcores and a relatively light spark streaming
>> workload. Overcomming the vcore resource (i.e. > 80) in the config (see (a)
>> below) seems to help to improve the average spark batch time (see (b)
>> below).
>>
>> Is there any best practice guideline on resource overcommit with cpu /
>> vcores, such as yarn config options, candidate cases ideal for
>> overcommiting vcores etc.?
>>
>> the slide below (from 2016 though) seems to address the memory overcommit
>> topic and hint a "future" topic on cpu overcommit:
>>
>> https://www.slideshare.net/HadoopSummit/investing-the-effects-of-overcommitting-yarn-resources
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__www.slideshare.net_HadoopSummit_investing-2Dthe-2Deffects-2Dof-2Dovercommitting-2Dyarn-2Dresources&d=DwMFaQ&c=jf_iaSHvJObTbx-siA1ZOg&r=9YF85k6Q86ELSbXl40mkGw&m=ZCbfeVtFh_TC0b2e0fobq62qrBKhQPtyBNfMsVcVzmo&s=UXeomeHkGRlHg9Bxgb81T98oH7zj7T6OmF4dsfhK0Sg&e=>
>>
>> Would like to know if this is a reasonable config practice and why this
>> is not achievable without overcommit. Any help/hint would be very much
>> appreciated!
>>
>> Thanks!
>>
>> Peter
>>
>> (a) yarn-site.xml
>> 
>> yarn.nodemanager.resource.cpu-vcores
>> 110
>> 
>>
>> 
>> yarn.scheduler.maximum-allocation-vcores
>> 110
>> 
>>
>>
>> (b)
>> FYI:
>> I have a system with 80 vcores and a relatively light spark streaming
>> workload. overcomming the vocore resource (here 100) seems to help the
>> average spark batch time. need more understanding on this practice.
>> Skylake (1 x 900K msg/sec) total batch# (avg) avg batch time in ms (avg) avg
>> user cpu (%) nw read (mb/sec)
>> 70vocres 178.20 8154.69 n/a n/a
>> 80vocres 177.40 7865.44 27.85 222.31
>> 100vcores 177.00 7,209.37 30.02 220.86
>>
>>


Re: configure yarn to use more vcores as the node provides?

2018-10-16 Thread Peter Liu
Hi Khaled,

the 50-2-3g I mentioned below is meant for the --conf spark.executor.*
config, in particular,  spark.executor.instances=50,  spark.executor.cores=2
and  spark.executor.memory=3g.
for each run, I configured the streaming producer and kafka broker to have
the partitions aligned with the consumer side (in this case, partition is
100), viewable on the spark UI.

yarn scheduler does not seem to check the actual hardware threads (logical
cores) on the actual node. on the other hand, there seems to be some
mechanism in yarn to allow overcommit for certain jobs (with high or low
watermark etc).

not sure  how this should work in a good practice. Would appreciate any
hint from the experts here.

Thanks for your reply!

Peter / Gang

On Tue, Oct 16, 2018 at 3:43 AM Khaled Zaouk  wrote:

> Hi Peter,
>
> I actually meant the spark configuration that you put in your spark-submit
> program (such as --conf spark.executor.instances= ..., --conf
> spark.executor.memory= ..., etc...).
>
> I advice you to check the number of partitions that you get in each stage
> of your workload the Spark GUI while the workload is running. I feel like
> this number is beyond 80, and this is why overcommiting cpu cores can
> achieve better latency if the workload is not cpu intensive.
>
> Another question, did you try different values for spark.executor.cores?
> (for example 3, 4 or 5 cores per executor in addition to 2?) Try to play a
> little bit with this parameter and check how it affects your latency...
>
> Best,
>
> Khaled
>
>
>
> On Tue, Oct 16, 2018 at 3:06 AM Peter Liu  wrote:
>
>> Hi Khaled,
>>
>> I have attached the spark streaming config below in (a).
>> In case of the 100vcore run (see the initial email), I used 50 executors
>> where each executor has 2 vcores and 3g memory. For 70 vcore case, 35
>> executors, for 80 vcore case, 40 executors.
>> In the yarn config (yarn-site.xml, (b) below), the  available vcores set
>> over 80 (I call it "overcommit").
>>
>> Not sure if there is a more proper way to do this (overcommit) and what
>> would be the best practice in this type of situation (say, light cpu
>> workload in a dedicated yarn cluster) to increase the cpu utilization for a
>> better performance.
>>
>> Any help would be very much appreciated.
>>
>> Thanks ...
>>
>> Peter
>>
>> (a)
>>
>>val df = spark
>>   .readStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", kafkaCluster.kafkaNodesString)
>>   .option("startingOffsets", "latest")
>>   .option("subscribe", Variables.EVENTS_TOPIC)
>>   .option("kafkaConsumer.pollTimeoutMs", "5000")
>>   .load()
>>   .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS
>> TIMESTAMP)").as[(String, Timestamp)]
>>   .select(from_json($"value", mySchema).as("data"), $"timestamp")
>>   .select("data.*", "timestamp")
>>   .where($"event_type" === "view")
>>   .select($"ad_id", $"event_time")
>>   .join(campaigns.toSeq.toDS().cache(), Seq("ad_id"))
>>   .groupBy(millisTime(window($"event_time", "10
>> seconds").getField("start")) as 'time_window, $"campaign_id")
>>   .agg(count("*") as 'count, max('event_time) as 'lastUpdate)
>>   .select(to_json(struct("*")) as 'value)
>>   .writeStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", kafkaCluster.kafkaNodesString)
>> //original
>>   .option("topic", Variables.OUTPUT_TOPIC)
>>   .option("checkpointLocation",
>> s"/tmp/${java.util.UUID.randomUUID()}") //TBD: ram disk?
>>   .outputMode("update")
>>   .start()
>>
>> (b)
>> 
>> yarn.nodemanager.resource.cpu-vcores
>> 110
>> 
>> 
>> yarn.scheduler.maximum-allocation-vcores
>> 110
>> 
>>
>> On Mon, Oct 15, 2018 at 4:26 PM Khaled Zaouk 
>> wrote:
>>
>>> Hi Peter,
>>>
>>> What parameters are you putting in your spark streaming configuration?
>>> What are you putting as number of executor instances and how many cores per
>>> executor are you setting in your Spark job?
>>>
>>> Best,
>>>
>>> Khaled
>>>
>>> On Mon, Oct 15, 2018 at 9:18 PM Peter Liu  wrote:
>>>
&g

Re: Spark In Memory Shuffle / 5403

2018-10-18 Thread Peter Liu
I would be very interested in the initial question here:

is there a production level implementation for memory only shuffle and
configurable (similar to  MEMORY_ONLY storage level,  MEMORY_OR_DISK
storage level) as mentioned in this ticket,
https://github.com/apache/spark/pull/5403 ?

It would be a quite practical and useful option/feature. not sure what is
the status of this ticket implementation?

Thanks!

Peter

On Thu, Oct 18, 2018 at 6:51 AM ☼ R Nair  wrote:

> Thanks..great info. Will try and let all know.
>
> Best
>
> On Thu, Oct 18, 2018, 3:12 AM onmstester onmstester 
> wrote:
>
>> create the ramdisk:
>> mount tmpfs /mnt/spark -t tmpfs -o size=2G
>>
>> then point spark.local.dir to the ramdisk, which depends on your
>> deployment strategy, for me it was through SparkConf object before passing
>> it to SparkContext:
>> conf.set("spark.local.dir","/mnt/spark")
>>
>> To validate that spark is actually using your ramdisk (by default it uses
>> /tmp), ls the ramdisk after running some jobs and you should see spark
>> directories (with date on directory name) on your ramdisk
>>
>>
>> Sent using Zoho Mail 
>>
>>
>>  On Wed, 17 Oct 2018 18:57:14 +0330 *☼ R Nair
>> >* wrote 
>>
>> What are the steps to configure this? Thanks
>>
>> On Wed, Oct 17, 2018, 9:39 AM onmstester onmstester <
>> onmstes...@zoho.com.invalid> wrote:
>>
>>
>> Hi,
>> I failed to config spark for in-memory shuffle so currently just
>> using linux memory mapped directory (tmpfs) as working directory of spark,
>> so everything is fast
>>
>> Sent using Zoho Mail 
>>
>>
>>
>>


Re: Spark In Memory Shuffle / 5403

2018-10-19 Thread Peter Liu
 Hi Peter,

thank you for the reply and detailed information! Would this something
comparable with Crail? (
http://crail.incubator.apache.org/blog/2017/11/rdmashuffle.html)
I was more looking for something simple/quick making the shuffle between
the local jvms quicker (like the idea of using local ram disk) for my
simple use case.

of course, a general and thorough implementation should cover the shuffle
between the nodes as major focus. hmm, looks like there is no
implementation within spark itself yet.

very much appreciated!

Peter

On Fri, Oct 19, 2018 at 9:38 AM Peter Rudenko 
wrote:

> Hey Peter, in SparkRDMA shuffle plugin (
> https://github.com/Mellanox/SparkRDMA) we're using mmap of shuffle file,
> to do Remote Direct Memory Access. If the shuffle data is bigger then RAM,
> Mellanox NIC support On Demand Paging, where OS invalidates translations
> which are no longer valid due to either non-present pages or mapping
> changes. So if you have an RDMA capable NIC (or you can try on Azure cloud
>
> https://azure.microsoft.com/en-us/blog/introducing-the-new-hb-and-hc-azure-vm-sizes-for-hpc/
>  ), have a try. For network intensive apps you should get better
> performance.
>
> Thanks,
> Peter Rudenko
>
> чт, 18 жовт. 2018 о 18:07 Peter Liu  пише:
>
>> I would be very interested in the initial question here:
>>
>> is there a production level implementation for memory only shuffle and
>> configurable (similar to  MEMORY_ONLY storage level,  MEMORY_OR_DISK
>> storage level) as mentioned in this ticket,
>> https://github.com/apache/spark/pull/5403 ?
>>
>> It would be a quite practical and useful option/feature. not sure what is
>> the status of this ticket implementation?
>>
>> Thanks!
>>
>> Peter
>>
>> On Thu, Oct 18, 2018 at 6:51 AM ☼ R Nair 
>> wrote:
>>
>>> Thanks..great info. Will try and let all know.
>>>
>>> Best
>>>
>>> On Thu, Oct 18, 2018, 3:12 AM onmstester onmstester 
>>> wrote:
>>>
>>>> create the ramdisk:
>>>> mount tmpfs /mnt/spark -t tmpfs -o size=2G
>>>>
>>>> then point spark.local.dir to the ramdisk, which depends on your
>>>> deployment strategy, for me it was through SparkConf object before passing
>>>> it to SparkContext:
>>>> conf.set("spark.local.dir","/mnt/spark")
>>>>
>>>> To validate that spark is actually using your ramdisk (by default it
>>>> uses /tmp), ls the ramdisk after running some jobs and you should see spark
>>>> directories (with date on directory name) on your ramdisk
>>>>
>>>>
>>>> Sent using Zoho Mail <https://www.zoho.com/mail/>
>>>>
>>>>
>>>>  On Wed, 17 Oct 2018 18:57:14 +0330 *☼ R Nair
>>>> >* wrote 
>>>>
>>>> What are the steps to configure this? Thanks
>>>>
>>>> On Wed, Oct 17, 2018, 9:39 AM onmstester onmstester <
>>>> onmstes...@zoho.com.invalid> wrote:
>>>>
>>>>
>>>> Hi,
>>>> I failed to config spark for in-memory shuffle so currently just
>>>> using linux memory mapped directory (tmpfs) as working directory of spark,
>>>> so everything is fast
>>>>
>>>> Sent using Zoho Mail <https://www.zoho.com/mail/>
>>>>
>>>>
>>>>
>>>>


Re: Spark In Memory Shuffle / 5403

2018-10-19 Thread Peter Liu
Hi Peter,

Thanks for the additional information - this is really helpful (I
definitively got more than I was looking for :-)

Cheers,

Peter


On Fri, Oct 19, 2018 at 12:53 PM Peter Rudenko 
wrote:

> Hi Peter, we're using a part of Crail - it's core library, called disni (
> https://github.com/zrlio/disni/). We couldn't reproduce results from that
> blog post, any case Crail is more platformic approach (it comes with it's
> own file system), while SparkRdma is a pluggable approach - it's just a
> plugin, that you can enable/disable for a particular workload, you can use
> any hadoop vendor, etc.
>
> The best optimization for shuffle between local jvms could be using
> something like short circuit local read (
> https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/ShortCircuitLocalReads.html)
> to use unix socket for local communication or just directly read a part
> from other's jvm shuffle file. But yes, it's not available in spark out of
> box.
>
> Thanks,
> Peter Rudenko
>
> пт, 19 жовт. 2018 о 16:54 Peter Liu  пише:
>
>> Hi Peter,
>>
>> thank you for the reply and detailed information! Would this something
>> comparable with Crail? (
>> http://crail.incubator.apache.org/blog/2017/11/rdmashuffle.html)
>> I was more looking for something simple/quick making the shuffle between
>> the local jvms quicker (like the idea of using local ram disk) for my
>> simple use case.
>>
>> of course, a general and thorough implementation should cover the shuffle
>> between the nodes as major focus. hmm, looks like there is no
>> implementation within spark itself yet.
>>
>> very much appreciated!
>>
>> Peter
>>
>> On Fri, Oct 19, 2018 at 9:38 AM Peter Rudenko 
>> wrote:
>>
>>> Hey Peter, in SparkRDMA shuffle plugin (
>>> https://github.com/Mellanox/SparkRDMA) we're using mmap of shuffle
>>> file, to do Remote Direct Memory Access. If the shuffle data is bigger then
>>> RAM, Mellanox NIC support On Demand Paging, where OS invalidates
>>> translations which are no longer valid due to either non-present pages or
>>> mapping changes. So if you have an RDMA capable NIC (or you can try on
>>> Azure cloud
>>> https://azure.microsoft.com/en-us/blog/introducing-the-new-hb-and-hc-azure-vm-sizes-for-hpc/
>>>  ), have a try. For network intensive apps you should get better
>>> performance.
>>>
>>> Thanks,
>>> Peter Rudenko
>>>
>>> чт, 18 жовт. 2018 о 18:07 Peter Liu  пише:
>>>
>>>> I would be very interested in the initial question here:
>>>>
>>>> is there a production level implementation for memory only shuffle and
>>>> configurable (similar to  MEMORY_ONLY storage level,  MEMORY_OR_DISK
>>>> storage level) as mentioned in this ticket,
>>>> https://github.com/apache/spark/pull/5403 ?
>>>>
>>>> It would be a quite practical and useful option/feature. not sure what
>>>> is the status of this ticket implementation?
>>>>
>>>> Thanks!
>>>>
>>>> Peter
>>>>
>>>> On Thu, Oct 18, 2018 at 6:51 AM ☼ R Nair 
>>>> wrote:
>>>>
>>>>> Thanks..great info. Will try and let all know.
>>>>>
>>>>> Best
>>>>>
>>>>> On Thu, Oct 18, 2018, 3:12 AM onmstester onmstester <
>>>>> onmstes...@zoho.com> wrote:
>>>>>
>>>>>> create the ramdisk:
>>>>>> mount tmpfs /mnt/spark -t tmpfs -o size=2G
>>>>>>
>>>>>> then point spark.local.dir to the ramdisk, which depends on your
>>>>>> deployment strategy, for me it was through SparkConf object before 
>>>>>> passing
>>>>>> it to SparkContext:
>>>>>> conf.set("spark.local.dir","/mnt/spark")
>>>>>>
>>>>>> To validate that spark is actually using your ramdisk (by default it
>>>>>> uses /tmp), ls the ramdisk after running some jobs and you should see 
>>>>>> spark
>>>>>> directories (with date on directory name) on your ramdisk
>>>>>>
>>>>>>
>>>>>> Sent using Zoho Mail <https://www.zoho.com/mail/>
>>>>>>
>>>>>>
>>>>>>  On Wed, 17 Oct 2018 18:57:14 +0330 *☼ R Nair
>>>>>> >* wrote 
>>>>>>
>>>>>> What are the steps to configure this? Thanks
>>>>>>
>>>>>> On Wed, Oct 17, 2018, 9:39 AM onmstester onmstester <
>>>>>> onmstes...@zoho.com.invalid> wrote:
>>>>>>
>>>>>>
>>>>>> Hi,
>>>>>> I failed to config spark for in-memory shuffle so currently just
>>>>>> using linux memory mapped directory (tmpfs) as working directory of 
>>>>>> spark,
>>>>>> so everything is fast
>>>>>>
>>>>>> Sent using Zoho Mail <https://www.zoho.com/mail/>
>>>>>>
>>>>>>
>>>>>>
>>>>>>


Re: read image or binary files / spark 2.3

2019-09-05 Thread Peter Liu
Hello experts,

I have quick question: which API allows me to read images files or binary
files (for SparkSession.readStream) from a local/hadoop file system in
Spark 2.3?

I have been browsing the following documentations and googling for it and
didn't find a good example/documentation:

https://spark.apache.org/docs/2.3.0/streaming-programming-guide.html
https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.package

any hint/help would be very much appreciated!

thanks!

Peter


Re: read binary files (for stream reader) / spark 2.3

2019-09-09 Thread Peter Liu
Hello experts,

I have one additional question: how can I read binary files into a stream
reader object? (intended for getting data into a kafka server).

I looked into DataStreamReader API (
https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-DataStreamReader.html#option)
and other google results and didn't find an option for binary file.

Any help would be very much appreciated!
(thanks again for Ilya's helpful information below - works fine on
sparkContext object)

Regards,

Peter


On Thu, Sep 5, 2019 at 3:09 PM Ilya Matiach  wrote:

> Hi Peter,
>
> You can use the spark.readImages API in spark 2.3 for reading images:
>
>
>
>
> https://databricks.com/blog/2018/12/10/introducing-built-in-image-data-source-in-apache-spark-2-4.html
>
>
> https://blogs.technet.microsoft.com/machinelearning/2018/03/05/image-data-support-in-apache-spark/
>
>
>
>
> https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.ml.image.ImageSchema$
>
>
>
> There’s also a spark package for spark versions older than 2.3:
>
> https://github.com/Microsoft/spark-images
>
>
>
> Thank you, Ilya
>
>
>
>
>
>
>
>
>
> *From:* Peter Liu 
> *Sent:* Thursday, September 5, 2019 2:13 PM
> *To:* dev ; User 
> *Subject:* Re: read image or binary files / spark 2.3
>
>
>
> Hello experts,
>
>
>
> I have quick question: which API allows me to read images files or binary
> files (for SparkSession.readStream) from a local/hadoop file system in
> Spark 2.3?
>
>
>
> I have been browsing the following documentations and googling for it and
> didn't find a good example/documentation:
>
>
>
> https://spark.apache.org/docs/2.3.0/streaming-programming-guide.html
> <https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fspark.apache.org%2Fdocs%2F2.3.0%2Fstreaming-programming-guide.html&data=02%7C01%7Cilmat%40microsoft.com%7Cad36f2af52aa4cc906d908d7322cc4e1%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C1%7C637033040182027177&sdata=vYJ%2Ftor22teIlzMGMfqvsiQn5D6iFHcf4u0N2K2dkmc%3D&reserved=0>
>
>
> https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.package
> <https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fspark.apache.org%2Fdocs%2F2.3.0%2Fapi%2Fscala%2Findex.html%23org.apache.spark.package&data=02%7C01%7Cilmat%40microsoft.com%7Cad36f2af52aa4cc906d908d7322cc4e1%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C1%7C637033040182037172&sdata=HeP0Bxk6eLdCk71uH7wcCxHwIM%2FCjbhzoQaiZgs0Gi0%3D&reserved=0>
>
>
>
> any hint/help would be very much appreciated!
>
>
>
> thanks!
>
>
>
> Peter
>


re: sharing data via kafka broker using spark streaming/ AnalysisException on collect()

2018-04-30 Thread Peter Liu
Hello there,

I have a quick question regarding how to share data (a small data
collection) between a kafka producer and consumer using spark streaming
(spark 2.2):

(A)
the data published by a kafka producer is received in order on the kafka
consumer side (see (a) copied below).

(B)
however, collect() or cache() on a streaming dataframe does not seem to be
supported (see links in (b) below): i got this:
Exception in thread "DataProducer" org.apache.spark.sql.AnalysisException:
Queries with streaming sources must be executed with writeStream.start();;

(C)
My question would be:

--- How can I use the collection data (on a streaming dataframe) arrived on
the consumer side, e.g convert it to an array of objects?
--- Maybe there's another quick way to use kafka for sharing static data
(instead of streaming) between two spark application services (without any
common spark context and session etc.)?

I have copied some code snippet in (c).

It seems to be a very simple use case scenario to share a global collection
between a spark producer and consumer. But I spent entire day to try
various options and go thru online resources such as
google-general/apache-spark/stackoverflow/cloudera/etc/etc.

Any help would be very much appreciated!

Thanks!

Peter

(a) streaming data (df) received on the consumer side (console sink):

root
 |-- ad_id: string (nullable = true)
 |-- campaign_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)

---
Batch: 0
---
+++---+
|ad_id   |campaign_id
|timestamp  |
+++---+
|b5629b58-376e-462c-9e65-726184390c84|bf412fa4-aeaa-4148-8467-1e1e2a6e0945|2018-04-27
14:35:45.475|
|64e93f73-15bb-478c-9f96-fd38f6a24da2|bf412fa4-aeaa-4148-8467-1e1e2a6e0945|2018-04-27
14:35:45.475|
|05fa1349-fcb3-432e-9b58-2bb0559859a2|060810fd-0430-444f-808c-8a177613226a|2018-04-27
14:35:45.478|
|ae0a176e-236a-4d3a-acb9-141157e81568|42b68023-6a3a-4618-a54a-e6f71df26710|2018-04-27
14:35:45.484|

(b) online discussions on unsupported operations on streaming dataframe:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-operations

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-operatio...


https://stackoverflow.com/questions/42062092/why-does-using-cache-on-streaming-datasets-fail-with-analysisexception-queries


(c) code snippet:

OK:

   val rawDf = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaCluster.kafkaNodesString)
  .option("startingOffsets", "earliest")
  .option("subscribe", Variables.CAMPAIGNS_TOPIC)
  .load()

OK:

val mySchema = StructType(Array(
  StructField("ad_id", StringType),
  StructField("campaign_id", StringType)))

val campaignsDf2 = campaignsDf.select(from_json($"value",
mySchema).as("data"), $"timestamp")
  .select("data.*", "timestamp")

OK:

   campaignsDf2.writeStream
.format("console")
.option("truncate","false")
.trigger(org.apache.spark.sql.streaming.Trigger.Once()) //trigger once
since this is a onetime static data
.awaitTermination()


Exception:
  val campaignsArrayRows = campaignsDf2.collect()  //< not
supported  > AnalysisException!


Re: spark 2.3.1 with kafka spark-streaming-kafka-0-10 (java.lang.AbstractMethodError)

2018-07-01 Thread Peter Liu
 Hello there,

I didn't get any response/help from the user list for the following
question and thought people on the dev list might be able to help?:

I upgraded to spark 2.3.1 from spark 2.2.1, ran my streaming workload and
got the error (java.lang.AbstractMethodError) never seen before; see the
error exception stack attached in (a) bellow.

anyone knows if  spark 2.3.1 works well with kafka
spark-streaming-kafka-0-10?

this link of spark kafka integration page doesn't say anything about any
limitation:
https://spark.apache.org/docs/2.3.1/streaming-kafka-integration.html


but this discussion seems to say there is indeed an issue when upgrading to
spark 2.3.1:
https://stackoverflow.com/questions/49180931/abstractmethoderror-creating-kafka-stream


i also rebuilt the workload with some spark 2.3.1 jars (see (b) below). it
doesn't seem to help.

Would be great if anyone could kindly share any insights here.

Thanks!

Peter

(a) the exception
Exception in thread "stream execution thread for [id =
5adae836-268a-4ebf-adc4-e3cc9fbe5acf, runId =
70e78d5c-665e-4c6f-a0cc-41a56e488e30]" java.lang.AbstractMethodError
at
org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)
at
org.apache.spark.sql.kafka010.KafkaSourceProvider$.initializeLogIfNecessary(KafkaSourceProvider.scala:369)
at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
at
org.apache.spark.sql.kafka010.KafkaSourceProvider$.log(KafkaSourceProvider.scala:369)
at
org.apache.spark.internal.Logging$class.logDebug(Logging.scala:58)
at
org.apache.spark.sql.kafka010.KafkaSourceProvider$.logDebug(KafkaSourceProvider.scala:369)
at
org.apache.spark.sql.kafka010.KafkaSourceProvider$ConfigUpdater.set(KafkaSourceProvider.scala:439)
at
org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:394)
at
org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:90)
at
org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:277)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:80)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:77)
at
scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194)
at
scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:77)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:75)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:75)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
at org.apache.spark.sql.execution.streaming.StreamExecution.org

$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:265)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)

(b)* the build script update:*

[pgl@datanode20 SparkStreamingBenchmark-RemoteConsumer-Spk231]$ diff
build.sbt spk211-build.sbt.original
10,11c10,11
< libraryDependencies += "org.apache.spark" % "spark-sql_2.11" %* "2.3.1"*
< libraryDependencies += "org.apache.spark" % "spark-core_2.11" %* "2.3.1"*
---
> libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.2.1"
> library