Spark Streaming - Routing rdd to Executor based on Key

2021-03-09 Thread forece85
We are doing batch processing using Spark Streaming with Kinesis with a batch
size of 5 mins. We want to send all events with same eventId to same
executor for a batch so that we can do multiple events based grouping
operations based on eventId. No previous batch or future batch data is
concerned. Only Current batch keyed operation needed.

Please help me how to achieve this. 

Thanks.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark Streaming - Routing rdd to Executor based on Key

2021-03-09 Thread forece85
We are doing batch processing using Spark Streaming with Kinesis with a batch
size of 5 mins. We want to send all events with same eventId to same
executor for a batch so that we can do multiple events based grouping
operations based on eventId. No previous batch or future batch data is
concerned. Only Current batch keyed operation needed.

Please help me how to achieve this. 

Thanks.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark streaming with multiple Kafka topics

2021-03-05 Thread lalitha bandaru
Hi Team,

I have a spark streaming application configured to consume events from 2
Kafka topics. But when I run the application locally, the messages are
consumed from either of these topics only and not both. If the first event
is published to say topic2 and second message to topic1 then only the
messages from topic2 are consumed.
But when I deployed the same code to an EMR cluster then the messages from
both topics are being consumed.
Can someone please help what configuration could be missing when run
locally? Thanks much in advance!

Thanks,
Lalitha
-- 
Thanks,
Lalitha B
Mobile - +1 8046290486


Re: Converting spark batch to spark streaming

2021-01-08 Thread Jacek Laskowski
Hi,

Start with DataStreamWriter.foreachBatch.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books 
Follow me on https://twitter.com/jaceklaskowski




On Thu, Jan 7, 2021 at 6:55 PM mhd wrk  wrote:

> I'm trying to convert a spark batch application to a streaming application
> and wondering what function (or design pattern) I should use to execute a
> series of operations inside the driver upon arrival of each message (a text
> file inside an HDFS folder) before starting computation inside executors.
>
> Thanks,
> Mohammad
>


Converting spark batch to spark streaming

2021-01-07 Thread mhd wrk
I'm trying to convert a spark batch application to a streaming application
and wondering what function (or design pattern) I should use to execute a
series of operations inside the driver upon arrival of each message (a text
file inside an HDFS folder) before starting computation inside executors.

Thanks,
Mohammad


[Spark Streaming] Why is ZooKeeper LeaderElection Agent not being called by Spark Master?

2020-12-29 Thread Saloni Mehta
Hello,

Request you to please help me out on the below queries:

I have 2 spark masters and 3 zookeepers deployed on my system on separate
virtual machines. The services come up online in the below sequence:

   1. zookeeper-1
   2. sparkmaster-1
   3. sparkmaster-2
   4. zookeeper-2
   5. zookeeper-3

The above sequence leads to both the spark masters running in STANDBY mode.

>From the logs, I can see that only after zookeeper-2 service comes up (i.e.
2 zookeeper services are up), spark master is successfully able to create a
zookeeper session. Until zookeeper-2 is up, it re-tries session creation.
However, after both zookeeper services are up and Persistence Engine is
able to successfully connect and create a session; the ZooKeeper
LeaderElection Agent is not called.
Logs:

10:03:47.241 INFO  org.apache.spark.internal.Logging:57 -
Persisting recovery state to
ZooKeeper
Initiating client connection,
connectString=zookeeper-2:,zookeeper-3:,zookeeper-1:
sessionTimeout=6 watcher=org.apache.curator.ConnectionState

# Only zookeeper-2 is online #

10:03:47.630 INFO  org.apache.zookeeper.ClientCnxn$SendThread:1025
- Opening socket connection to
server zookeeper-1:. Will not attempt to authenticate using
SASL (unknown error)
10:03:50.635 INFO  org.apache.zookeeper.ClientCnxn$SendThread:1162
- Socket error occurred:
zookeeper-1:: No route to host
10:03:50.738 INFO  org.apache.zookeeper.ClientCnxn$SendThread:1025
- Opening socket
connection to server zookeeper-2:. Will not attempt to
authenticate using SASL (unknown
error)
2020-12-18 10:03:50.739 INFO
org.apache.zookeeper.ClientCnxn$SendThread:879 - Socket connection
established to zookeeper-2:, initiating session
10:03:50.742 INFO  org.apache.zookeeper.ClientCnxn$SendThread:1158
- Unable to read
additional data from server sessionid 0x0, likely server has
closed socket, closing socket
connection and attempting reconnect
10:03:51.842 INFO  org.apache.zookeeper.ClientCnxn$SendThread:1025
- Opening socket
connection to server zookeeper-3:. Will not attempt to
authenticate using SASL (unknown
error)
10:03:51.843 INFO  org.apache.zookeeper.ClientCnxn$SendThread:1162
- Socket error
occurred: zookeeper-3:: Connection refused

10:04:02.685 ERROR org.apache.curator.ConnectionState:200 -
Connection timed out for connection
string (zookeeper-2:,zookeeper-3:,zookeeper-1:) and
timeout (15000) / elapsed (15274)
org.apache.curator.CuratorConnectionLossException: KeeperErrorCode
= ConnectionLoss
at 
org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:197)
at 
org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:87)

10:04:22.691 ERROR org.apache.curator.ConnectionState:200 -
Connection timed out for connection
string (zookeeper-2:,zookeeper-3:,zookeeper-1:) and
timeout (15000) / elapsed (35297)
org.apache.curator.CuratorConnectionLossException: KeeperErrorCode
= ConnectionLoss
at 
org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:197)

10:04:42.696 ERROR org.apache.curator.ConnectionState:200 -
Connection timed out for connection
string (zookeeper-2:,zookeeper-3:,zookeeper-1:) and
timeout (15000) / elapsed (55301)
org.apache.curator.CuratorConnectionLossException: KeeperErrorCode
= ConnectionLoss
at 
org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:197)
at 
org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:87)

10:05:32.699 WARN  org.apache.curator.ConnectionState:191 -
Connection attempt unsuccessful after
105305 (greater than max timeout of 6). Resetting connection
and trying again with a new
connection.
10:05:32.864 INFO  org.apache.zookeeper.ZooKeeper:693 - Session: 0x0 closed
10:05:32.865 INFO  org.apache.zookeeper.ZooKeeper:442 - Initiating
client connection,
connectString=zookeeper-2:,zookeeper-3:,zookeeper-1:
sessionTimeout=6
watcher=org.apache.curator.ConnectionState@
10:05:32.864 INFO  org.apache.zookeeper.ClientCnxn$EventThread:522
- EventThread shut down for
session: 0x0

10:05:32.969 ERROR org.apache.spark.internal.Logging:94 - Ignoring error
org.apache.zookeeper.KeeperException$ConnectionLossException:
KeeperErrorCode = ConnectionLoss
for /x/y
at org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:54)

# zookeeper-2, zookeeper-3 are online #

10:05:47.357 INFO  org.apache.zookeeper.ClientCnxn$SendThread:1025
- Opening socket connection to
server zookeeper-2:. Will not attempt to authenticate using
SASL (unknown error)
10:05:47.358 INFO  org.apache.zookeeper.ClientCnxn$SendThread:879
- Socket connection established
to 

[Spark Streaming] support of non-timebased windows and lag function

2020-12-22 Thread Moser, Michael
Hi,

I have a question regarding Spark structured streaming:
will non-timebased window operations like the lag function be supported at some 
point, or is this not on the table due to technical difficulties?

I.e. will something like this be possible in the future:
w = Window.partitionBy('uid').orderBy('timestamp')
df = df.withColumn('lag', lag(df['col_A']).over(w))

This would be useful in streaming applications, where we look for "patterns" 
based on the occurrence of multiple events (rows) in a particular order.
A different way to achieve the above functionality, while being more general, 
would be to use joins, but for this a streaming-ready, monotonically increasing 
and (!) concurrent uid would be needed.

Thanks a lot & best,
Michael


[Spark Streaming] unsubscribe and subscribe kafka topic without stopping context

2020-12-14 Thread Sanjay Tiwari
Hi All,

I am using spark streaming with kafka, I am working on an application which
requires me to process data based on user requests. I have created 2
dstream i.e. one per topic but I am unable to stop processing of data on
user request or re-enable processing of data.

I have asked question on stackoverflow as well ..
https://stackoverflow.com/questions/65285184/spark-subscribe-and-unsubscribe-topic-from-spark-streaming-without-requiring-res

Any help here will be appreciated.

Thanks
Sanjay


Re: how to manage HBase connections in Executors of Spark Streaming ?

2020-11-25 Thread chen kevin
  1.  the issue about that Kerberos expires.
 *   You don’t need to care aboubt usually, you can use the local keytab at 
every node in the Hadoop cluster.
 *   If there don’t have the keytab in your Hadoop cluster, you will need 
update your keytab in every executor periodically。
  2.  best practices about how to manage Hbase connections with kerberos 
authentication, the demo.java is the code about how to get the hbase connection.




From: big data 
Date: Tuesday, November 24, 2020 at 1:58 PM
To: "user@spark.apache.org" 
Subject: how to manage HBase connections in Executors of Spark Streaming ?


Hi,

Does any best practices about how to manage Hbase connections with kerberos 
authentication in Spark Streaming (YARN) environment?

Want to now how executors manage the HBase connections,how to create them, 
close them and refresh Kerberos expires.

Thanks.


demo.java
Description: demo.java

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

how to manage HBase connections in Executors of Spark Streaming ?

2020-11-23 Thread big data

Hi,

Does any best practices about how to manage Hbase connections with 
kerberos authentication in Spark Streaming (YARN) environment?


Want to now how executors manage the HBase connections,how to create 
them, close them and refresh Kerberos expires.


Thanks.



Single spark streaming job to read incoming events with dynamic schema

2020-11-16 Thread act_coder
I am trying to create a spark structured streaming job which reads from a
Kafka topic and the events coming from that Kafka topic will have different
schemas (There is no standard schema for the incoming events).

Sample incoming events:

event1: {timestamp:2018-09-28T15:50:57.2420418+00:00, value: 11}
event2: {timestamp:2018-09-28T15:50:57.2420418+00:00, value: 11,
location:abc}
event3: {order_id:1, ordervalue: 11}
How can I create a spark structured streaming to read the above events
without stopping the spark job for making any new schema changes ?

Also we may need to provide schema while using spark.readStream(). I thought
of reading a small subset of incoming data and derive the schema from it.
But, that might not work here as the incoming data is disparate and we may
have different schemas for each of the incoming events.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark streaming with Kafka

2020-11-03 Thread Kevin Pis
Hi,

this is my  Word Count demo.  https://github.com/kevincmchen/wordcount

MohitAbbi  于2020年11月4日周三 上午3:32写道:

> Hi,
>
> Can you please share the correct versions of JAR files which you used to
> resolve the issue. I'm also facing the same issue.
>
> Thanks
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 

Best,

Kevin Pis


Re: Spark streaming with Kafka

2020-11-03 Thread MohitAbbi
Hi,

Can you please share the correct versions of JAR files which you used to
resolve the issue. I'm also facing the same issue.

Thanks




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming Job is stucked

2020-10-18 Thread Artemis User
If it was running fine before and stops working now, one thing I could 
think of may be your disk was full.  Check your disk space and clean up 
your old log files might help...


On 10/18/20 12:06 PM, rajat kumar wrote:

Hello Everyone,

My spark streaming job is running too slow, it is having batch time of 
15 seconds and the batch gets completed in 20-22 secs. It was fine 
till 1st week October, but it is behaving this way suddenly. I know 
changing the batch time can help , but other than that any idea what 
can be done?


Please note I am using Direct Stream Approach

Thanks
Rajat Sharma


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark Streaming Job is stucked

2020-10-18 Thread rajat kumar
Hello Everyone,

My spark streaming job is running too slow, it is having batch time of 15
seconds and the batch gets completed in 20-22 secs. It was fine till 1st
week October, but it is behaving this way suddenly. I know changing the
batch time can help , but other than that any idea what can be done?

Please note I am using Direct Stream Approach

Thanks
Rajat Sharma


Spark Streaming Custom Receiver for REST API

2020-10-06 Thread Muhammed Favas
Hi,
I have REST GET endpoint that gives data packet from a queue. I 
am thinking of implementing spark custom receiver to stream the data into spark.
Somebody please help me on how to integrate REST endpoint with spark custom 
receiver class.


Regards,
Favas



Re: Spark Streaming ElasticSearch

2020-10-06 Thread jainshasha
Hi Siva

In that case u can use structured streaming foreach / foreachBatch function
which can help you process each record and write it into some sink 




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming ElasticSearch

2020-10-05 Thread Siva Samraj
Hi Jainshasha,

I need to read each row from Dataframe and made some changes to it before
inserting it into ES.

Thanks
Siva

On Mon, Oct 5, 2020 at 8:06 PM jainshasha  wrote:

> Hi Siva
>
> To emit data into ES using spark structured streaming job you need to used
> ElasticSearch jar which has support for sink for spark structured streaming
> job. For this you can use this one my branch where we have integrated ES
> with spark 3.0 and scala 2.12 compatible
> https://github.com/ThalesGroup/spark/tree/guavus/v3.0.0
>
> Also in this you need to build three jars
> elasticsearch-hadoop-sql
> elasticsearch-hadoop-core
> elasticsearch-hadoop-mr
> which help in writing data into ES through spark structured streaming.
>
> And in your application job u can use this way to sink the data, remember
> with ES there is only support of append mode of structured streaming.
> val esDf = aggregatedDF
> .writeStream
> .outputMode("append")
> .format("org.elasticsearch.spark.sql")
> .option(CHECKPOINTLOCATION, kafkaCheckpointDirPath + "/es")
> .start("aggregation-job-index-latest-1")
>
>
> Let me know if you face any issues, will be happy to help you :)
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Streaming ElasticSearch

2020-10-05 Thread jainshasha
Hi Siva

To emit data into ES using spark structured streaming job you need to used
ElasticSearch jar which has support for sink for spark structured streaming
job. For this you can use this one my branch where we have integrated ES
with spark 3.0 and scala 2.12 compatible
https://github.com/ThalesGroup/spark/tree/guavus/v3.0.0

Also in this you need to build three jars 
elasticsearch-hadoop-sql
elasticsearch-hadoop-core
elasticsearch-hadoop-mr
which help in writing data into ES through spark structured streaming.

And in your application job u can use this way to sink the data, remember
with ES there is only support of append mode of structured streaming.
val esDf = aggregatedDF
.writeStream
.outputMode("append")
.format("org.elasticsearch.spark.sql")
.option(CHECKPOINTLOCATION, kafkaCheckpointDirPath + "/es")
.start("aggregation-job-index-latest-1")


Let me know if you face any issues, will be happy to help you :)




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark Streaming ElasticSearch

2020-10-05 Thread Siva Samraj
Hi Team,

I have a spark streaming job, which will read from kafka and write into
elastic via Http request.

I want to validate each request from Kafka and change the payload as per
business need and write into Elastic Search.

I have used ES Http Request to push the data into Elastic Search. Can some
guide me how to write the data into ES via a data frame?

*Code Snippet: *
 val dfInput = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "test")
  .option("startingOffsets", "latest")
  .option("group.id", sourceTopicGroupId)
  .option("failOnDataLoss", "false")
  .option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
  .load()

import spark.implicits._

val resultDf = dfInput
  .withColumn("value", $"value".cast("string"))
  .select("value")

resultDf.writeStream.foreach(new ForeachWriter[Row] {
  override def open(partitionId: Long, version: Long): Boolean = true

  override def process(value: Row): Unit = {
processEventsData(value.get(0).asInstanceOf[String], deviceIndex,
msgIndex, retryOnConflict,auth,refreshInterval,deviceUrl,messageUrl,spark)
  }

  override def close(errorOrNull: Throwable): Unit = {
  }

}).trigger(Trigger.ProcessingTime(triggerPeriod)).start().awaitTermination()
//"1 second"
  }

Please suggest, is there any approach.

Thanks


Create custom receiver for MQTT in spark streaming

2020-10-01 Thread Muhammed Favas
Hi,

I have a requirement to do analysis using spark on the data coming from IoT 
device via MQTT broker. The connectivity from my spark job is with MQTT broker 
where I can subscribe to specific topics. I have used the MQTTUtils library in 
spark to connect to the broker, but I have doubts about how the library works 
internally. What I noticed is, "MQTTutils.createStream" connects to the MQTT 
broker for a topic and it is without any authentication to broker. In that 
case, if I have to subscribe 100 topics in an MQTT broker, it may establish 100 
connections to broker (Please let me know if it is not working the way I 
think.). That is not desirable in my real scenario.

So I have decided to create a custom receiver for MQTT broker so that I can 
manage the connection in my MQTT client. I have gone through the document on 
how to implement a custom receiver, but I did not succeed in implementing it in 
the proper way.

If somebody has hands-on on such a custom receiver, please help me to make it 
works.

Appreciate your support since it is critical for my solution

Regards,
Favas



Re: [Spark Prometheus Metrics] How to add my own metrics in spark streaming job?

2020-09-29 Thread christinegong
Hi,
In the spark job, it exports to prometheus localhost http server, to be
later scraped by prometheus service.
(https://github.com/prometheus/client_java#http) The problem here is when
ssh to the emr instances themselves, only can see the metrics on (e.g. curl
localhost:9111) driver in local mode. If i run the spark job in cluster
mode, the localhost:9111 is still available for curl but no data, executors
i can not curl at all. Same scenario when running in kubernetes, i also
check if the metrics are there or not by execing into the containers.
Hope that makes the question clear here
Prometheus supports pushgateway but thats for short batch job, not sure if
thats applicable for my long running streaming job
Thanks! 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark Prometheus Metrics] How to add my own metrics in spark streaming job?

2020-09-29 Thread Artemis User
I am confused with your question.  Are you running a the Spark cluster 
on AWS EMR and trying to output the result to a Prometheus instance 
running on your localhost?   Isn't your localhost behind the firewall 
and not accessible by AWS?  What does it mean "have prometheus available 
in executors"?   Apparently you need to have a Prometheus instance 
running on AWS so your EMR cluster can access easily.


Directing Spark output/sink to Prometheus would be difficult. The ideal 
integration scenario would be to write a Spark customer connector that 
uses the Prometheus client library to populate your Spark processing 
result directly in Prometheus' database.  Hope this helps...


-- ND

On 9/28/20 3:21 AM, Christine Gong wrote:
What should i do to expose my own custom prometheus metrics for 
cluster mode spark streaming job?


I want to run a spark streaming job to read from kafka , do some 
calculations and write to localhost prometheus on port 9111. 
https://github.com/jaegertracing/jaeger-analytics-java/blob/master/spark/src/main/java/io/jaegertracing/analytics/spark/SparkRunner.java#L47 
is it possible to have the prometheus available in executors? I tried 
both emr cluster as well as k8s, only local mode works (the metrics 
are available on driver's 9111 only)
Looks like the prometheus servlet sink is my best option? Any advice 
would be much appreciated!!


Thanks,
Christine


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Spark Prometheus Metrics] How to add my own metrics in spark streaming job?

2020-09-28 Thread Christine Gong
What should i do to expose my own custom prometheus metrics for cluster
mode spark streaming job?

I want to run a spark streaming job to read from kafka , do some
calculations and write to localhost prometheus on port 9111.
https://github.com/jaegertracing/jaeger-analytics-java/blob/master/spark/src/main/java/io/jaegertracing/analytics/spark/SparkRunner.java#L47
is it possible to have the prometheus available in executors? I tried both
emr cluster as well as k8s, only local mode works (the metrics are
available on driver's 9111 only)
Looks like the prometheus servlet sink is my best option? Any advice would
be much appreciated!!

Thanks,
Christine


Spark streaming job not able to launch more number of executors

2020-09-18 Thread Vibhor Banga ( Engineering - VS)
Hi all,

We have a spark streaming job which reads from two kafka topics with 10
partitions each. And we are running the streaming job with 3 concurrent
microbatches. (So total 20 partitions and 3 concurrency)

We have following question:

In our processing DAG, we do a rdd.persist() at one stage, after which we
fork out the DAG into two. Each of the forks has an action (forEach) at the
end. In this case, we are observing that the number of executors is not
exceeding the number of input kafka partitions. Job is not spawning more
than 60 executors (2*10*3). And we see that the tasks from the two actions
and the 3 concurrent microbatches are competing with each other for
resources. So even though the max processing time of a task is 'x', the
overall  processing time of the stage is much greater than 'x'.

Is there a way by which we can ensure that the two forks of the DAG get
processed in parallel by spawning more number of executors?
(We have not put any cap of maxExecutors)

Following are the job configurations:
spark.dynamicAllocation.enabled: true
spark.dynamicAllocation.minExecutors: NOT_SET

Please let us know if you have any ideas that can be useful here.

Thanks,
-Vibhor

-- 



*-*


*This email and any files transmitted with it are confidential and 
intended solely for the use of the individual or entity to whom they are 
addressed. If you have received this email in error, please notify the 
system manager. This message contains confidential information and is 
intended only for the individual named. If you are not the named addressee, 
you should not disseminate, distribute or copy this email. Please notify 
the sender immediately by email if you have received this email by mistake 
and delete this email from your system. If you are not the intended 
recipient, you are notified that disclosing, copying, distributing or 
taking any action in reliance on the contents of this information is 
strictly prohibited.*

 

*Any views or opinions presented in this 
email are solely those of the author and do not necessarily represent those 
of the organization. Any information on shares, debentures or similar 
instruments, recommended product pricing, valuations and the like are for 
information purposes only. It is not meant to be an instruction or 
recommendation, as the case may be, to buy or to sell securities, products, 
services nor an offer to buy or sell securities, products or services 
unless specifically stated to be so on behalf of the Flipkart group. 
Employees of the Flipkart group of companies are expressly required not to 
make defamatory statements and not to infringe or authorise any 
infringement of copyright or any other legal right by email communications. 
Any such communication is contrary to organizational policy and outside the 
scope of the employment of the individual concerned. The organization will 
not accept any liability in respect of such communication, and the employee 
responsible will be personally liable for any damages or other liability 
arising.*

 

*Our organization accepts no liability for the 
content of this email, or for the consequences of any actions taken on the 
basis of the information *provided,* unless that information is 
subsequently confirmed in writing. If you are not the intended recipient, 
you are notified that disclosing, copying, distributing or taking any 
action in reliance on the contents of this information is strictly 
prohibited.*


_-_



Re: Spark Streaming Checkpointing

2020-09-04 Thread András Kolbert
Hi Gábor,


Thanks for your reply on this!

Internally that's used at the company I work at - it hasn't been changed
mainly due to the compatibility of the current deployed java applications.

Hence I am attempting to make the most of this version :)

András



On Fri, 4 Sep 2020, 14:09 Gabor Somogyi,  wrote:

> Hi Andras,
>
> A general suggestion is to use Structured Streaming instead of DStreams
> because it provides several things out of the box (stateful streaming,
> etc...).
> Kafka 0.8 is super old and deprecated (no security...). Do you have a
> specific reason to use that?
>
> BR,
> G
>
>
> On Thu, Sep 3, 2020 at 11:41 AM András Kolbert 
> wrote:
>
>> Hi All,
>>
>> I have a Spark streaming application (2.4.4, Kafka 0.8 >> so Spark Direct
>> Streaming) running just fine.
>>
>> I create a context in the following way:
>>
>> ssc = StreamingContext(sc, 60) opts = 
>> {"metadata.broker.list":kafka_hosts,"auto.offset.reset": "largest", 
>> "group.id": run_type}
>> kvs = KafkaUtils.createDirectStream(ssc, [topic_listen], opts)
>> kvs.checkpoint(120)
>>
>> lines = kvs.map(lambda row: row[1]) lines.foreachRDD(streaming_app)
>> ssc.checkpoint(checkpoint)
>>
>> The streaming app at a high level does this:
>>
>>- processes incoming batch
>>- unions to the dataframe from the previous batch and aggregates them
>>
>> Currently, I use checkpointing explicitly (df = df.checkpoint()) to
>> optimise the lineage. Although this is quite an expensive exercise and was
>> wondering if there is a better way to do this.
>>
>> I tried to disable this explicit checkpointing, as I have a periodical
>> checkpointing (kvs.checkpoint(120) ) so I thought that the lineage will
>> be kept to that checkpointed RDD. Although in reality that is not the case
>> and processing keeps increasing over time.
>>
>> Am I doing something inherently wrong? Is there a better way of doing
>> this?
>>
>> Thanks
>> Andras
>>
>


Re: Spark Streaming Checkpointing

2020-09-04 Thread Gabor Somogyi
Hi Andras,

A general suggestion is to use Structured Streaming instead of DStreams
because it provides several things out of the box (stateful streaming,
etc...).
Kafka 0.8 is super old and deprecated (no security...). Do you have a
specific reason to use that?

BR,
G


On Thu, Sep 3, 2020 at 11:41 AM András Kolbert 
wrote:

> Hi All,
>
> I have a Spark streaming application (2.4.4, Kafka 0.8 >> so Spark Direct
> Streaming) running just fine.
>
> I create a context in the following way:
>
> ssc = StreamingContext(sc, 60) opts = 
> {"metadata.broker.list":kafka_hosts,"auto.offset.reset": "largest", 
> "group.id": run_type}
> kvs = KafkaUtils.createDirectStream(ssc, [topic_listen], opts)
> kvs.checkpoint(120)
>
> lines = kvs.map(lambda row: row[1]) lines.foreachRDD(streaming_app)
> ssc.checkpoint(checkpoint)
>
> The streaming app at a high level does this:
>
>- processes incoming batch
>- unions to the dataframe from the previous batch and aggregates them
>
> Currently, I use checkpointing explicitly (df = df.checkpoint()) to
> optimise the lineage. Although this is quite an expensive exercise and was
> wondering if there is a better way to do this.
>
> I tried to disable this explicit checkpointing, as I have a periodical
> checkpointing (kvs.checkpoint(120) ) so I thought that the lineage will
> be kept to that checkpointed RDD. Although in reality that is not the case
> and processing keeps increasing over time.
>
> Am I doing something inherently wrong? Is there a better way of doing this?
>
> Thanks
> Andras
>


Spark Streaming Checkpointing

2020-09-03 Thread András Kolbert
Hi All,

I have a Spark streaming application (2.4.4, Kafka 0.8 >> so Spark Direct
Streaming) running just fine.

I create a context in the following way:

ssc = StreamingContext(sc, 60) opts =
{"metadata.broker.list":kafka_hosts,"auto.offset.reset": "largest",
 "group.id": run_type}
kvs = KafkaUtils.createDirectStream(ssc, [topic_listen], opts)
kvs.checkpoint(120)

lines = kvs.map(lambda row: row[1]) lines.foreachRDD(streaming_app)
ssc.checkpoint(checkpoint)

The streaming app at a high level does this:

   - processes incoming batch
   - unions to the dataframe from the previous batch and aggregates them

Currently, I use checkpointing explicitly (df = df.checkpoint()) to
optimise the lineage. Although this is quite an expensive exercise and was
wondering if there is a better way to do this.

I tried to disable this explicit checkpointing, as I have a periodical
checkpointing (kvs.checkpoint(120) ) so I thought that the lineage will be
kept to that checkpointed RDD. Although in reality that is not the case and
processing keeps increasing over time.

Am I doing something inherently wrong? Is there a better way of doing this?

Thanks
Andras


Re: Appropriate checkpoint interval in a spark streaming application

2020-08-15 Thread Sheel Pancholi
Guys any inputs explaining the rationale on the below question will really
help. Requesting some expert opinion.

Regards,
Sheel

On Sat, 15 Aug, 2020, 1:47 PM Sheel Pancholi,  wrote:

> Hello,
>
> I am trying to figure an appropriate checkpoint interval for my spark
> streaming application. Its Spark Kafka integration based on Direct Streams.
>
> If my *micro batch interval is 2 mins*, and let's say *each microbatch
> takes only 15 secs to process* then shouldn't my checkpoint interval also
> be exactly 2 mins?
>
> Assuming my spark streaming application starts at t=0, following will be
> the state of my checkpoint:
>
> *Case 1: checkpoint interval is less than microbatch interval*
> If I keep my *checkpoint interval at say 1 minute *then:
> *t=1m: *no incomplete batches in this checkpoint
> *t=2m*: first microbatch is included as an incomplete microbatch in the
> checkpoint and microbatch execution then begins
> *t=3m*: no incomplete batches in the checkpoint as the first microbatch
> is finished processing in just 15 secs
> *t=4m*: second microbatch is included as an incomplete microbatch in the
> checkpoint and microbatch execution then begins
> *t=4m30s*: system breaks down; on restarting the streaming application
> finds the checkpoint at t=4 with the second microbatch as the incomplete
> microbatch and processes it again. But what's the point of reprocessing it
> again since the second microbatch's processing was completed at the=4m15s
>
> *Case 2: checkpoint interval is more than microbatch interval*
> If I keep my *checkpoint interval at say 4 minutes* then:
> *t=2m* first microbatch execution begins
> *t=4m* first checkpoint with second microbatch included as the only
> incomplete batch; second microbatch processing begins
>
> *Sub case 1 :* *system breaks down at t=2m30s :* the first microbatch
> execution was completed at the=2m15s but there is no checkpoint information
> about this microbatch since the first checkpoint will happen at t=4m.
> Consequently, when the streaming app restarts it will re-execute by
> fetching the offsets from Kafka.
>
> *Sub case 2 :* *system breaks down at t=5m :* The second microbatch was
> already completed in 15 secs i.e. t=4m15s which means at t=5 there should
> ideally be no incomplete batches. When I restart my application, the
> streaming application finds the second microbatch as incomplete from the
> checkpoint made at t=4m, and re-executes that microbatch.
>
>
> Is my understanding right? If yes, then isn't my checkpoint interval
> incorrectly set resulting in duplicate processing in both the cases above?
> If yes, then how do I choose an appropriate checkpoint interval?
>
> Regards,
> Sheel
>


Appropriate checkpoint interval in a spark streaming application

2020-08-15 Thread Sheel Pancholi
Hello,

I am trying to figure an appropriate checkpoint interval for my spark
streaming application. Its Spark Kafka integration based on Direct Streams.

If my *micro batch interval is 2 mins*, and let's say *each microbatch
takes only 15 secs to process* then shouldn't my checkpoint interval also
be exactly 2 mins?

Assuming my spark streaming application starts at t=0, following will be
the state of my checkpoint:

*Case 1: checkpoint interval is less than microbatch interval*
If I keep my *checkpoint interval at say 1 minute *then:
*t=1m: *no incomplete batches in this checkpoint
*t=2m*: first microbatch is included as an incomplete microbatch in the
checkpoint and microbatch execution then begins
*t=3m*: no incomplete batches in the checkpoint as the first microbatch is
finished processing in just 15 secs
*t=4m*: second microbatch is included as an incomplete microbatch in the
checkpoint and microbatch execution then begins
*t=4m30s*: system breaks down; on restarting the streaming application
finds the checkpoint at t=4 with the second microbatch as the incomplete
microbatch and processes it again. But what's the point of reprocessing it
again since the second microbatch's processing was completed at the=4m15s

*Case 2: checkpoint interval is more than microbatch interval*
If I keep my *checkpoint interval at say 4 minutes* then:
*t=2m* first microbatch execution begins
*t=4m* first checkpoint with second microbatch included as the only
incomplete batch; second microbatch processing begins

*Sub case 1 :* *system breaks down at t=2m30s :* the first microbatch
execution was completed at the=2m15s but there is no checkpoint information
about this microbatch since the first checkpoint will happen at t=4m.
Consequently, when the streaming app restarts it will re-execute by
fetching the offsets from Kafka.

*Sub case 2 :* *system breaks down at t=5m :* The second microbatch was
already completed in 15 secs i.e. t=4m15s which means at t=5 there should
ideally be no incomplete batches. When I restart my application, the
streaming application finds the second microbatch as incomplete from the
checkpoint made at t=4m, and re-executes that microbatch.


Is my understanding right? If yes, then isn't my checkpoint interval
incorrectly set resulting in duplicate processing in both the cases above?
If yes, then how do I choose an appropriate checkpoint interval?

Regards,
Sheel


Re: Spark Streaming with Kafka and Python

2020-08-12 Thread Sean Owen
What supports Python in (Kafka?) 0.8? I don't think Spark ever had a
specific Python-Kafka integration. But you have always been able to
use it to read DataFrames as in Structured Streaming.
Kafka 0.8 support is deprecated (gone in 3.0) but 0.10 means 0.10+ -
works with the latest 2.x.
What is the issue?

On Wed, Aug 12, 2020 at 7:53 AM German Schiavon
 wrote:
>
> Hey,
>
> Maybe I'm missing some restriction with EMR, but have you tried to use 
> Structured Streaming instead of Spark Streaming?
>
> https://spark.apache.org/docs/2.4.5/structured-streaming-kafka-integration.html
>
> Regards
>
> On Wed, 12 Aug 2020 at 14:12, Hamish Whittal  
> wrote:
>>
>> Hi folks,
>>
>> Thought I would ask here because it's somewhat confusing. I'm using Spark 
>> 2.4.5 on EMR 5.30.1 with Amazon MSK.
>>
>> The version of Scala used is 2.11.12. I'm using this version of the 
>> libraries spark-streaming-kafka-0-8_2.11-2.4.5.jar
>>
>> Now I'm wanting to read from Kafka topics using Python (I need to stick to 
>> Python specifically).
>>
>> What seems confusing is that 0.8 has Python support, but 0.10 does not. Then 
>> 0.8 seems to have been deprecated as of Spark 2.3.0, so if I'm using 2.4.5 
>> then clearly I'm going to hit a roadblock here.
>>
>> Can someone clarify these things for me? Have I got this right?
>>
>> Thanks in advance,
>> Hamish

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming with Kafka and Python

2020-08-12 Thread German Schiavon
Hey,

Maybe I'm missing some restriction with EMR, but have you tried to use
Structured Streaming instead of Spark Streaming?

https://spark.apache.org/docs/2.4.5/structured-streaming-kafka-integration.html

Regards

On Wed, 12 Aug 2020 at 14:12, Hamish Whittal 
wrote:

> Hi folks,
>
> Thought I would ask here because it's somewhat confusing. I'm using Spark
> 2.4.5 on EMR 5.30.1 with Amazon MSK.
>
> The version of Scala used is 2.11.12. I'm using this version of the
> libraries spark-streaming-kafka-0-8_2.11-2.4.5.jar
>
> Now I'm wanting to read from Kafka topics using Python (I need to stick to
> Python specifically).
>
> What seems confusing is that 0.8 has Python support, but 0.10 does not.
> Then 0.8 seems to have been deprecated as of Spark 2.3.0, so if I'm using
> 2.4.5 then clearly I'm going to hit a roadblock here.
>
> Can someone clarify these things for me? Have I got this right?
>
> Thanks in advance,
> Hamish
>


Spark Streaming with Kafka and Python

2020-08-12 Thread Hamish Whittal
Hi folks,

Thought I would ask here because it's somewhat confusing. I'm using Spark
2.4.5 on EMR 5.30.1 with Amazon MSK.

The version of Scala used is 2.11.12. I'm using this version of the
libraries spark-streaming-kafka-0-8_2.11-2.4.5.jar

Now I'm wanting to read from Kafka topics using Python (I need to stick to
Python specifically).

What seems confusing is that 0.8 has Python support, but 0.10 does not.
Then 0.8 seems to have been deprecated as of Spark 2.3.0, so if I'm using
2.4.5 then clearly I'm going to hit a roadblock here.

Can someone clarify these things for me? Have I got this right?

Thanks in advance,
Hamish


Re: Spark streaming receivers

2020-08-10 Thread Russell Spitzer
The direct approach, which is also available through dstreams, and
structured streaming use a different model. Instead of being a push based
streaming solution they instead are pull based. (In general)

On every batch the driver uses the configuration to create a number of
partitions, each is responsible for independently pulling a number of
records. The exact number of records and guarantees around the pull are
source and configuration dependent. Since the system is pull based, there
is no need for a receiver or block management system taking up resources.
Every task/partition contains all the information required to get the data
that it describes.

An example in Kafka, the driver might decide that batch 1 contains all the
records between offset 1 and 100. It checks and sees that there are 10
Kafka partitions. So it ends up making a spark job which contains 10 tasks
each task dedicated to a single Kafka partition. Each task will then
independently ask for 100 records from it's Kafka partition. There will be
no Spark resources used outside of those required for those 10 tasks.

On Sun, Aug 9, 2020, 10:44 PM Dark Crusader 
wrote:

> Hi Russell,
> This is super helpful. Thank you so much.
>
> Can you elaborate on the differences between structured streaming vs
> dstreams? How would the number of receivers required etc change?
>
> On Sat, 8 Aug, 2020, 10:28 pm Russell Spitzer, 
> wrote:
>
>> Note, none of this applies to Direct streaming approaches, only receiver
>> based Dstreams.
>>
>> You can think of a receiver as a long running task that never finishes.
>> Each receiver is submitted to an executor slot somewhere, it then runs
>> indefinitely and internally has a method which passes records over to a
>> block management system. There is a timing that you set which decides when
>> each block is "done" and records after that time has passed go into the
>> next block (See parameter
>> <https://spark.apache.org/docs/latest/configuration.html#spark-streaming>
>>  spark.streaming.blockInterval)  Once a block is done it can be
>> processed in the next Spark batch.. The gap between a block starting and a
>> block being finished is why you can lose data in Receiver streaming without
>> WriteAheadLoging. Usually your block interval is divisible into your batch
>> interval so you'll get X blocks per batch. Each block becomes one partition
>> of the job being done in a Streaming batch. Multiple receivers can be
>> unified into a single dstream, which just means the blocks produced by all
>> of those receivers are handled in the same Streaming batch.
>>
>> So if you have 5 different receivers, you need at minimum 6 executor
>> cores. 1 core for each receiver, and 1 core to actually do your processing
>> work. In a real world case you probably want significantly more  cores on
>> the processing side than just 1. Without repartitioning you will never have
>> more that
>>
>> A quick example
>>
>> I run 5 receivers with block interval of 100ms and spark batch interval
>> of 1 second. I use union to group them all together, I will most likely end
>> up with one Spark Job for each batch every second running with 50
>> partitions (1000ms / 100(ms / partition / receiver) * 5 receivers). If I
>> have a total of 10 cores in the system. 5 of them are running receivers,
>> The remaining 5 must process the 50 partitions of data generated by the
>> last second of work.
>>
>> And again, just to reiterate, if you are doing a direct streaming
>> approach or structured streaming, none of this applies.
>>
>> On Sat, Aug 8, 2020 at 10:03 AM Dark Crusader <
>> relinquisheddra...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I'm having some trouble figuring out how receivers tie into spark
>>> driver-executor structure.
>>> Do all executors have a receiver that is blocked as soon as it
>>> receives some stream data?
>>> Or can multiple streams of data be taken as input into a single executor?
>>>
>>> I have stream data coming in at every second coming from 5 different
>>> sources. I want to aggregate data from each of them. Does this mean I need
>>> 5 executors or does it have to do with threads on the executor?
>>>
>>> I might be mixing in a few concepts here. Any help would be appreciated.
>>> Thank you.
>>>
>>


Re: Spark streaming receivers

2020-08-09 Thread Dark Crusader
Hi Russell,
This is super helpful. Thank you so much.

Can you elaborate on the differences between structured streaming vs
dstreams? How would the number of receivers required etc change?

On Sat, 8 Aug, 2020, 10:28 pm Russell Spitzer, 
wrote:

> Note, none of this applies to Direct streaming approaches, only receiver
> based Dstreams.
>
> You can think of a receiver as a long running task that never finishes.
> Each receiver is submitted to an executor slot somewhere, it then runs
> indefinitely and internally has a method which passes records over to a
> block management system. There is a timing that you set which decides when
> each block is "done" and records after that time has passed go into the
> next block (See parameter
> <https://spark.apache.org/docs/latest/configuration.html#spark-streaming>
> spark.streaming.blockInterval)  Once a block is done it can be processed
> in the next Spark batch.. The gap between a block starting and a block
> being finished is why you can lose data in Receiver streaming without
> WriteAheadLoging. Usually your block interval is divisible into your batch
> interval so you'll get X blocks per batch. Each block becomes one partition
> of the job being done in a Streaming batch. Multiple receivers can be
> unified into a single dstream, which just means the blocks produced by all
> of those receivers are handled in the same Streaming batch.
>
> So if you have 5 different receivers, you need at minimum 6 executor
> cores. 1 core for each receiver, and 1 core to actually do your processing
> work. In a real world case you probably want significantly more  cores on
> the processing side than just 1. Without repartitioning you will never have
> more that
>
> A quick example
>
> I run 5 receivers with block interval of 100ms and spark batch interval of
> 1 second. I use union to group them all together, I will most likely end up
> with one Spark Job for each batch every second running with 50 partitions
> (1000ms / 100(ms / partition / receiver) * 5 receivers). If I have a total
> of 10 cores in the system. 5 of them are running receivers, The remaining 5
> must process the 50 partitions of data generated by the last second of work.
>
> And again, just to reiterate, if you are doing a direct streaming approach
> or structured streaming, none of this applies.
>
> On Sat, Aug 8, 2020 at 10:03 AM Dark Crusader <
> relinquisheddra...@gmail.com> wrote:
>
>> Hi,
>>
>> I'm having some trouble figuring out how receivers tie into spark
>> driver-executor structure.
>> Do all executors have a receiver that is blocked as soon as it
>> receives some stream data?
>> Or can multiple streams of data be taken as input into a single executor?
>>
>> I have stream data coming in at every second coming from 5 different
>> sources. I want to aggregate data from each of them. Does this mean I need
>> 5 executors or does it have to do with threads on the executor?
>>
>> I might be mixing in a few concepts here. Any help would be appreciated.
>> Thank you.
>>
>


Re: Spark streaming receivers

2020-08-08 Thread Russell Spitzer
Note, none of this applies to Direct streaming approaches, only receiver
based Dstreams.

You can think of a receiver as a long running task that never finishes.
Each receiver is submitted to an executor slot somewhere, it then runs
indefinitely and internally has a method which passes records over to a
block management system. There is a timing that you set which decides when
each block is "done" and records after that time has passed go into the
next block (See parameter
<https://spark.apache.org/docs/latest/configuration.html#spark-streaming>
spark.streaming.blockInterval)  Once a block is done it can be processed in
the next Spark batch.. The gap between a block starting and a block being
finished is why you can lose data in Receiver streaming without
WriteAheadLoging. Usually your block interval is divisible into your batch
interval so you'll get X blocks per batch. Each block becomes one partition
of the job being done in a Streaming batch. Multiple receivers can be
unified into a single dstream, which just means the blocks produced by all
of those receivers are handled in the same Streaming batch.

So if you have 5 different receivers, you need at minimum 6 executor cores.
1 core for each receiver, and 1 core to actually do your processing work.
In a real world case you probably want significantly more  cores on the
processing side than just 1. Without repartitioning you will never have
more that

A quick example

I run 5 receivers with block interval of 100ms and spark batch interval of
1 second. I use union to group them all together, I will most likely end up
with one Spark Job for each batch every second running with 50 partitions
(1000ms / 100(ms / partition / receiver) * 5 receivers). If I have a total
of 10 cores in the system. 5 of them are running receivers, The remaining 5
must process the 50 partitions of data generated by the last second of work.

And again, just to reiterate, if you are doing a direct streaming approach
or structured streaming, none of this applies.

On Sat, Aug 8, 2020 at 10:03 AM Dark Crusader 
wrote:

> Hi,
>
> I'm having some trouble figuring out how receivers tie into spark
> driver-executor structure.
> Do all executors have a receiver that is blocked as soon as it
> receives some stream data?
> Or can multiple streams of data be taken as input into a single executor?
>
> I have stream data coming in at every second coming from 5 different
> sources. I want to aggregate data from each of them. Does this mean I need
> 5 executors or does it have to do with threads on the executor?
>
> I might be mixing in a few concepts here. Any help would be appreciated.
> Thank you.
>


Spark streaming receivers

2020-08-08 Thread Dark Crusader
Hi,

I'm having some trouble figuring out how receivers tie into spark
driver-executor structure.
Do all executors have a receiver that is blocked as soon as it
receives some stream data?
Or can multiple streams of data be taken as input into a single executor?

I have stream data coming in at every second coming from 5 different
sources. I want to aggregate data from each of them. Does this mean I need
5 executors or does it have to do with threads on the executor?

I might be mixing in a few concepts here. Any help would be appreciated.
Thank you.


Re: Kafka with Spark Streaming work on local but it doesn't work in Standalone mode

2020-07-24 Thread Gabor Somogyi
Hi Davide,

Please see the doc:
*Note: Kafka 0.8 support is deprecated as of Spark 2.3.0.*

Have you tried the same with Structured Streaming and not with DStreams?
If you insist somehow to DStreams you can use spark-streaming-kafka-0-10
connector instead.

BR,
G


On Fri, Jul 24, 2020 at 12:08 PM Davide Curcio 
wrote:

> Hi,
>
> I'm trying to use Spark Streaming with a very simple script like this:
>
> from pyspark import SparkContext, SparkConf
> from pyspark.streaming import StreamingContext
> from pyspark.streaming.kafka import KafkaUtils
>
>
> sc = SparkContext(appName="PythonSparkStreamingKafka")
>
>
> ssc = StreamingContext(sc, 1)
> kafkaParams = {"metadata.broker.list": "172.31.71.104:9092",
>"auto.offset.reset": "smallest"}
>
> training = KafkaUtils.createDirectStream(ssc, ["test"], kafkaParams)
>
> training.pprint()
>
> ssc.start()
> ssc.awaitTermination()
>
> But although locally it works, with the cluster using Standalone mode it
> crashes. I have a cluster with 4 machines:
>
> 1 machine with Kafka Producer, 1 Broker and 1 Zookeeper
> 1 machine is the driver
> 2 machines are the workers.
>
> The strange thing is that when I had Kafka Producer, Broker and Zookeeper
> in the same machine in which I have the driver, it worked both locally and
> in the cluster. But obviously for the sake of scalability and modularity
> I'd like to use the current configuration.
>
> I'm using Spark 2.4.6, the Kafka Streaming API are
> "spark-streaming-kafka-0-8-assembly_2.11-2.4.6" and the Kafka version that
> I'm currently using is kafka_2.11-2.4.1
>
> The result is the following:
>
> 020-07-24 09:48:25,869 WARN scheduler.TaskSetManager: Lost task 0.0 in
> stage 0.0 (TID 0, 172.31.69.185, executor 0):
> java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
> at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
> at
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
> at
> org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
> at
> org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
> at
> org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
>
> 2020-07-24 09:48:25,875 INFO scheduler.TaskSetManager: Starting task 0.1
> in stage 0.0 (TID 1, 172.31.69.185, executor 0, partition 0, ANY, 7785
> bytes)
> 2020-07-24 09:48:25,950 INFO scheduler.TaskSetManager: Lost task 0.1 in
> stage 0.0 (TID 1) on 172.31.69.185, executor 0:
> java.nio.channels.ClosedChannelException (null) [duplicate 1]
> 2020-07-24 09:48:25,952 INFO scheduler.TaskSetManager: Starting task 0.2
> in stage 0.0 (TID 2, 172.31.69.185, executor 0, partition 0, ANY, 7785
> bytes)
> 2020-07-24 09:48:25,984 INFO scheduler.TaskSetManager: Lost task 0.2 in
> stage 0.0 (TID 2) on 172.31.69.185, executor 0:
> java.nio.channels.ClosedChannelException (null) [duplicate 2]
> 2020-07-24 09:48:25,985 INFO scheduler.TaskSetManager: Starting task 0.3
> in stage 0.0 (TID 3, 172.31.79.221, executor 1, partition 0, ANY, 7785
> bytes)
> 2020-07-24 09:48:26,026 INFO scheduler.JobScheduler: Added jobs for time
> 1595584106000 ms
> 

Kafka with Spark Streaming work on local but it doesn't work in Standalone mode

2020-07-24 Thread Davide Curcio
Hi,

I'm trying to use Spark Streaming with a very simple script like this:

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils


sc = SparkContext(appName="PythonSparkStreamingKafka")


ssc = StreamingContext(sc, 1)
kafkaParams = {"metadata.broker.list": "172.31.71.104:9092",
   "auto.offset.reset": "smallest"}

training = KafkaUtils.createDirectStream(ssc, ["test"], kafkaParams)

training.pprint()

ssc.start()
ssc.awaitTermination()
But although locally it works, with the cluster using Standalone mode it 
crashes. I have a cluster with 4 machines:

1 machine with Kafka Producer, 1 Broker and 1 Zookeeper
1 machine is the driver
2 machines are the workers.

The strange thing is that when I had Kafka Producer, Broker and Zookeeper in 
the same machine in which I have the driver, it worked both locally and in the 
cluster. But obviously for the sake of scalability and modularity I'd like to 
use the current configuration.

I'm using Spark 2.4.6, the Kafka Streaming API are 
"spark-streaming-kafka-0-8-assembly_2.11-2.4.6" and the Kafka version that I'm 
currently using is kafka_2.11-2.4.1

The result is the following:

020-07-24 09:48:25,869 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 
0.0 (TID 0, 172.31.69.185, executor 0): java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at 
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at 
org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
at 
org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at 
org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)

2020-07-24 09:48:25,875 INFO scheduler.TaskSetManager: Starting task 0.1 in 
stage 0.0 (TID 1, 172.31.69.185, executor 0, partition 0, ANY, 7785 bytes)
2020-07-24 09:48:25,950 INFO scheduler.TaskSetManager: Lost task 0.1 in stage 
0.0 (TID 1) on 172.31.69.185, executor 0: 
java.nio.channels.ClosedChannelException (null) [duplicate 1]
2020-07-24 09:48:25,952 INFO scheduler.TaskSetManager: Starting task 0.2 in 
stage 0.0 (TID 2, 172.31.69.185, executor 0, partition 0, ANY, 7785 bytes)
2020-07-24 09:48:25,984 INFO scheduler.TaskSetManager: Lost task 0.2 in stage 
0.0 (TID 2) on 172.31.69.185, executor 0: 
java.nio.channels.ClosedChannelException (null) [duplicate 2]
2020-07-24 09:48:25,985 INFO scheduler.TaskSetManager: Starting task 0.3 in 
stage 0.0 (TID 3, 172.31.79.221, executor 1, partition 0, ANY, 7785 bytes)
2020-07-24 09:48:26,026 INFO scheduler.JobScheduler: Added jobs for time 
1595584106000 ms
2020-07-24 09:48:26,375 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 
in memory on 172.31.79.221:44371 (size: 4.0 KB, free: 366.3 MB)
2020-07-24 09:48:27,022 INFO scheduler.JobScheduler: Added jobs for time 
1595584107000 ms
2020-07-24 09:48:27,165 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 
0.0 (TID 3) on 172.31.79.221, executor 1: 
java.nio.channels.ClosedChannelException (null) [duplicate 3]
2020-07-24 09:48:27,167 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 
failed 4 times; aborting job
2020-07-24 09:48:27,171 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, 
whose tasks have all completed, from pool
2020-07-24 09:48:27,172 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0
2020-07-24 09:48:27,172 INFO scheduler.TaskSchedul

How to introduce reset logic when aggregating/joining streaming dataframe with static dataframe for spark streaming

2020-07-24 Thread Yong Yuan
A good feature of spark structured streaming is that it can join the static
dataframe with the streaming dataframe. To cite an example as below. users
is a static dataframe read from database. transactionStream is from a
stream. By the joining operation, we can get the spending of each country
accumulated with the new arrival of batches.

val spendingByCountry = (transactionStream
.join(users, users("id") === transactionStream("userid"))
.groupBy($"country")
.agg(sum($"cost")) as "spending")

spendingByContry.writeStream
.outputMode("complete")
.format("console")
.start()

The sum of cost is aggregated with the arrival of new batches as shown
below.

---
Batch: 0
---
Country Spending
EN  90.0
FR  50.0

---
Batch: 1
---
Country Spending
EN  190.0
FR  150.0

If I want to introduce a notification and reset logic as the above example,
what should be the correct approach? The requirement is that if the
spending is larger than some threshold, the records of country and spending
should be stored into a table and the spending should be reset as 0 to
accumulate again.


Re: Spark Streaming - Set Parallelism and Optimize driver

2020-07-20 Thread Russell Spitzer
Without seeing the rest (and you can confirm this by looking at the DAG
visualization in the Spark UI) I would say your first stage with 6
partitions is:

Stage 1: Read data from kinesis (or read blocks from receiver not sure what
method you are using) and write shuffle files for repartition
Stage 2 : Read shuffle files and do everything else

In general I think the biggest issue here is probably not the distribution
of tasks which based on your UI reading were quite small, but instead the
parallelization of the write operation since it is done synchronously. I
would suggest instead of trying to increase your parallelism by
partitioning, you attempt to have "doJob" run asynchronously and allow for
more parallelism within an executor rather than using multiple executor
threads/jvms.

That said you probably would run faster if you just skipped the repartition
based on the speed of second stage.

On Mon, Jul 20, 2020 at 8:23 AM forece85  wrote:

> Thanks for reply. Please find sudo code below. Its Dstreams reading for
> every
> 10secs from kinesis stream and after transformations, pushing into hbase.
> Once got Dstream, we are using below code to repartition and do processing:
>
> dStream = dStream.repartition(javaSparkContext.defaultMinPartitions() * 3);
> dStream.foreachRDD(javaRDD -> javaRDD.foreachPartition(partitionOfRecords
> ->
> {
>Connection hbaseConnection= ConnectionUtil.getHbaseConnection();
>List listOfRecords = new ArrayList<>();
>while (partitionOfRecords.hasNext()) {
>  listOfRecords.add(partitionOfRecords.next());
>
>  if (listOfRecords.size() < 10 && partitionOfRecords.hasNext())
> continue;
>
>  List finalListOfRecords = listOfRecords;
>  doJob(finalListOfRecords, hbaseConnection);
>  listOfRecords = new ArrayList<>();
>}
> }));
>
>
> We are batching every 10 records and pass to doJob method where we batch
> process and bulk insert to hbase.
>
> With above code, will it be able to tell what is happening at job 1 -> 6
> tasks? and how to replace repartition method efficiently.
>
> Thanks in Advance
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Streaming - Set Parallelism and Optimize driver

2020-07-20 Thread forece85
Thanks for reply. Please find sudo code below. Its Dstreams reading for every
10secs from kinesis stream and after transformations, pushing into hbase.
Once got Dstream, we are using below code to repartition and do processing:

dStream = dStream.repartition(javaSparkContext.defaultMinPartitions() * 3);
dStream.foreachRDD(javaRDD -> javaRDD.foreachPartition(partitionOfRecords ->
{
   Connection hbaseConnection= ConnectionUtil.getHbaseConnection();
   List listOfRecords = new ArrayList<>();
   while (partitionOfRecords.hasNext()) {
 listOfRecords.add(partitionOfRecords.next());

 if (listOfRecords.size() < 10 && partitionOfRecords.hasNext())
continue;
 
 List finalListOfRecords = listOfRecords;
 doJob(finalListOfRecords, hbaseConnection);
 listOfRecords = new ArrayList<>();
   }
}));


We are batching every 10 records and pass to doJob method where we batch
process and bulk insert to hbase.

With above code, will it be able to tell what is happening at job 1 -> 6
tasks? and how to replace repartition method efficiently.

Thanks in Advance



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming - Set Parallelism and Optimize driver

2020-07-20 Thread forece85
Thanks for reply. Please find sudo code below. We are fetching Dstreams from
kinesis stream for every 10sec and performing transformations and finally
persisting to hbase tables using batch insertions.

dStream = dStream.repartition(jssc.defaultMinPartitions() * 3);
dStream.foreachRDD(javaRDD -> javaRDD.foreachPartition(partitionOfRecords ->
{
Connection hbaseConnection =
ConnectionUtil.getHbaseConnection();
List listOfRecords = new ArrayList<>();
while (partitionOfRecords.hasNext()) {
try {
listOfRecords.add(partitionOfRecords.next());

if (listOfRecords.size() < 10 &&
partitionOfRecords.hasNext())
continue;

List finalListOfRecords = listOfRecords;
doJob(finalListOfRecords, primaryConnection,
lookupsConnection);
listOfRecords = new ArrayList<>();
} catch (Exception e) {
e.printStackTrace();
}
}
})); 

We are batching every 10 records and sending to doJob method where actual
transformations happen and every batch will get batch inserted to hbase
table.

With above code can we guess whats happening at Job 1 => 6 tasks and how to
reduce that time. 
Mainly how to effectively set parallelism avoiding repartition() method.

Thanks in Advance.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming - Set Parallelism and Optimize driver

2020-07-20 Thread Russell Spitzer
Without your code this is hard to determine but a few notes.

The number of partitions is usually dictated by the input source, see if it
has any configuration which allows you to increase input splits.

I'm not sure why you think some of the code is running on the driver. All
methods on dataframes and rdds will be executed on executors. For each
partition is not local.

The difference in partitions is probably the shuffle you added with
repartition. I would actually be not surprised if your code ran faster
without the repartitioning. But again with the real code it would be very
hard to say.

On Mon, Jul 20, 2020, 6:33 AM forece85  wrote:

> I am new to spark streaming and trying to understand spark ui and to do
> optimizations.
>
> 1. Processing at executors took less time than at driver. How to optimize
> to
> make driver tasks fast ?
> 2. We are using dstream.repartition(defaultParallelism*3) to increase
> parallelism which is causing high shuffles. Is there any option to avoid
> repartition manually to reduce data shuffles.
> 3. Also trying to understand how 6 tasks in stage1 and 199 tasks in stage2
> got created?
>
> *hardware configuration:* executor-cores: 3; driver-cores: 3;
> dynamicAllocation is true;
> initial,min,maxExecutors: 25
>
> StackOverFlow link for screenshots:
>
> https://stackoverflow.com/questions/62993030/spark-dstream-help-needed-to-understand-ui-and-how-to-set-parallelism-or-defau
>
> Thanks in Advance
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Spark Streaming - Set Parallelism and Optimize driver

2020-07-20 Thread forece85
I am new to spark streaming and trying to understand spark ui and to do
optimizations.

1. Processing at executors took less time than at driver. How to optimize to
make driver tasks fast ?
2. We are using dstream.repartition(defaultParallelism*3) to increase
parallelism which is causing high shuffles. Is there any option to avoid
repartition manually to reduce data shuffles.
3. Also trying to understand how 6 tasks in stage1 and 199 tasks in stage2
got created?

*hardware configuration:* executor-cores: 3; driver-cores: 3;
dynamicAllocation is true; 
initial,min,maxExecutors: 25

StackOverFlow link for screenshots:
https://stackoverflow.com/questions/62993030/spark-dstream-help-needed-to-understand-ui-and-how-to-set-parallelism-or-defau

Thanks in Advance



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark streaming with Confluent kafka

2020-07-03 Thread Gabor Somogyi
The error is clear:
Caused by: java.lang.IllegalArgumentException: No serviceName defined in
either JAAS or Kafka config

On Fri, 3 Jul 2020, 15:40 dwgw,  wrote:

> Hi
>
> I am trying to stream confluent kafka topic in the spark shell. For that i
> have invoked spark shell using following command.
>
> # spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0
> --conf
>
> "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/spark/kafka_jaas.conf"
> --driver-java-options
> "-Djava.security.auth.login.config=/home/spark/kafka_jaas.conf" --files
> /home/spark/kafka_jaas.conf
>
> kafka_jaas.conf
> -
>
> KafkaClient {
>
>  org.apache.kafka.common.security.plain.PlainLoginModule required
>username="XXX"
>password="XXX";
> };
>
> Readstream
> -
>
> scala> val df = spark.
> | readStream.
> | format("kafka").
> | option("kafka.bootstrap.servers", "pkc-XXX.cloud:9092").
> | option("subscribe", "INTERNAL_VIS_R1227_CDC_ICX_SESSIONS").
> | option("kafka.sasl.mechanisms", "PLAIN").
> | option("kafka.security.protocol", "SASL_SSL").
> | option("startingOffsets", "earliest").
> | load.
> | select($"value".cast("string").alias("value"))
> df: org.apache.spark.sql.DataFrame = [value: string]
>
> Writestream
> --
>
> scala> df.writeStream.
> | format("console").
> | outputMode("append").
> | trigger(Trigger.ProcessingTime("20 seconds")).
> | start
> 2020-07-03 04:07:48,366 WARN streaming.StreamingQueryManager: Temporary
> checkpoint location created which is deleted normally when the query didn't
> fail: /tmp/temporary-897996c3-a86a-4b7d-ac19-62168a14d279. If it's required
> to delete it under any circumstances, please set
> spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to
> know deleting temp checkpoint folder is best effort.
> res0: org.apache.spark.sql.streaming.StreamingQuery =
> org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@324a5741
>
> scala> 2020-07-03 04:07:49,139 WARN kafka010.KafkaOffsetReader: Error in
> attempt 1 getting Kafka offsets:
> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:820)
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:631)
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:612)
> at
>
> org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:76)
> at
>
> org.apache.spark.sql.kafka010.KafkaOffsetReader.consumer(KafkaOffsetReader.scala:88)
> at
>
> org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReader.scala:538)
> at
>
> org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReader.scala:600)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at
>
> org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
> at
>
> org.apache.spark.sql.kafka010.KafkaOffsetReader.withRetriesWithoutInterrupt(KafkaOffsetReader.scala:599)
> at
>
> org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReader.scala:536)
> at
>
> org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:567)
> at
>
> org.apache.spark.sql.kafka010.KafkaOffsetReader.partitionsAssignedToConsumer(KafkaOffsetReader.scala:536)
> at
>
> org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchEarliestOffsets(KafkaOffsetReader.scala:298)
> at
>
> org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:151)
> at scala.Option.getOrElse(Option.scala:189)
> at
>
> org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:148)
> at
>
> org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:76)
> at
>
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:378)
> at scala.Option.getOrElse(Option.scala:189)
> at
>
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:378)
> at
>
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
> at
>
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
> at
>
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
> at
>
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:371)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)
> at 

Spark streaming with Confluent kafka

2020-07-03 Thread dwgw
Hi

I am trying to stream confluent kafka topic in the spark shell. For that i
have invoked spark shell using following command.

# spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0
--conf
"spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/spark/kafka_jaas.conf"
--driver-java-options
"-Djava.security.auth.login.config=/home/spark/kafka_jaas.conf" --files
/home/spark/kafka_jaas.conf

kafka_jaas.conf
-

KafkaClient {
                     
 org.apache.kafka.common.security.plain.PlainLoginModule required
                       username="XXX"
                       password="XXX";
};

Readstream
-

scala> val df = spark.
| readStream.
| format("kafka").
| option("kafka.bootstrap.servers", "pkc-XXX.cloud:9092").
| option("subscribe", "INTERNAL_VIS_R1227_CDC_ICX_SESSIONS").
| option("kafka.sasl.mechanisms", "PLAIN").
| option("kafka.security.protocol", "SASL_SSL").
| option("startingOffsets", "earliest").
| load.
| select($"value".cast("string").alias("value"))
df: org.apache.spark.sql.DataFrame = [value: string]

Writestream
--

scala> df.writeStream.
| format("console").
| outputMode("append").
| trigger(Trigger.ProcessingTime("20 seconds")).
| start
2020-07-03 04:07:48,366 WARN streaming.StreamingQueryManager: Temporary
checkpoint location created which is deleted normally when the query didn't
fail: /tmp/temporary-897996c3-a86a-4b7d-ac19-62168a14d279. If it's required
to delete it under any circumstances, please set
spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to
know deleting temp checkpoint folder is best effort.
res0: org.apache.spark.sql.streaming.StreamingQuery =
org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@324a5741

scala> 2020-07-03 04:07:49,139 WARN kafka010.KafkaOffsetReader: Error in
attempt 1 getting Kafka offsets:
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:820)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:631)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:612)
at
org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:76)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.consumer(KafkaOffsetReader.scala:88)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReader.scala:538)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReader.scala:600)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.withRetriesWithoutInterrupt(KafkaOffsetReader.scala:599)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReader.scala:536)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:567)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.partitionsAssignedToConsumer(KafkaOffsetReader.scala:536)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchEarliestOffsets(KafkaOffsetReader.scala:298)
at
org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:151)
at scala.Option.getOrElse(Option.scala:189)
at
org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:148)
at
org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:76)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:378)
at scala.Option.getOrElse(Option.scala:189)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:378)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:371)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:368)
at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at

Re: Spark streaming with Kafka

2020-07-02 Thread dwgw
Hi

I am able to correct the issue. The issue was due to wrong version of JAR
file I have used. I have removed the these JAR files and copied correct
version of JAR files and the error has gone away.

Regards



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark streaming with Kafka

2020-07-02 Thread Jungtaek Lim
I can't reproduce. Could you please make sure you're running spark-shell
with official spark 3.0.0 distribution? Please try out changing the
directory and using relative path like "./spark-shell".

On Thu, Jul 2, 2020 at 9:59 PM dwgw  wrote:

> Hi
> I am trying to stream kafka topic from spark shell but i am getting the
> following error.
> I am using *spark 3.0.0/scala 2.12.10* (Java HotSpot(TM) 64-Bit Server VM,
> *Java 1.8.0_212*)
>
> *[spark@hdp-dev ~]$ spark-shell --packages
> org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0*
> Ivy Default Cache set to: /home/spark/.ivy2/cache
> The jars for the packages stored in: /home/spark/.ivy2/jars
> :: loading settings :: url =
>
> jar:file:/u01/hadoop/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
> org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
> :: resolving dependencies ::
>
> org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226;1.0
> confs: [default]
> found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in central
> found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0
> in
> central
> found org.apache.kafka#kafka-clients;2.4.1 in central
> found com.github.luben#zstd-jni;1.4.4-3 in central
> found org.lz4#lz4-java;1.7.1 in central
> found org.xerial.snappy#snappy-java;1.1.7.5 in central
> found org.slf4j#slf4j-api;1.7.30 in central
> found org.spark-project.spark#unused;1.0.0 in central
> found org.apache.commons#commons-pool2;2.6.2 in central
> :: resolution report :: resolve 502ms :: artifacts dl 10ms
> :: modules in use:
> com.github.luben#zstd-jni;1.4.4-3 from central in [default]
> org.apache.commons#commons-pool2;2.6.2 from central in [default]
> org.apache.kafka#kafka-clients;2.4.1 from central in [default]
> org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 from central in
> [default]
> org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 from
> central in [default]
> org.lz4#lz4-java;1.7.1 from central in [default]
> org.slf4j#slf4j-api;1.7.30 from central in [default]
> org.spark-project.spark#unused;1.0.0 from central in [default]
> org.xerial.snappy#snappy-java;1.1.7.5 from central in [default]
>
> -
> |  |modules||   artifacts
> |
> |   conf   | number| search|dwnlded|evicted||
> number|dwnlded|
>
> -
> |  default |   9   |   0   |   0   |   0   ||   9   |   0
> |
>
> -
> :: retrieving ::
> org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226
> confs: [default]
> 0 artifacts copied, 9 already retrieved (0kB/13ms)
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
> setLogLevel(newLevel).
> Spark context Web UI available at http://hdp-dev.infodetics.com:4040
> Spark context available as 'sc' (master = yarn, app id =
> application_1593620640299_0015).
> Spark session available as 'spark'.
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 3.0.0
>   /_/
>
> Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java
> 1.8.0_212)
> Type in expressions to have them evaluated.
> Type :help for more information.
>
>
> scala> val df = spark.
>  | readStream.
>  | format("kafka").
>  | option("kafka.bootstrap.servers", "XXX").
>  | option("subscribe", "XXX").
>  | option("kafka.sasl.mechanisms", "XXX").
>  | option("kafka.security.protocol", "XXX").
>  | option("kafka.sasl.username","XXX").
>  | option("kafka.sasl.password", "XXX").
>  | option("startingOffsets", "earliest").
>  | load
> java.lang.AbstractMethodError: Method
>
> org/apache/spark/sql/kafka010/KafkaSourceProvider.inferSchema(Lorg/apache/spark/sql/util/CaseInsensitiveStringMap;)Lorg/apache/spark/sql/types/StructType;
> is abstract
>   at
>
> org.apache.spark.sql.kafka010.KafkaSourceProvider.inferSchema(KafkaSourceProvider.scala)
>   at
>
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:81)
>   at
>
> org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:215)
>   ... 57 elided
>
> Looking forward for a response.
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Spark streaming with Kafka

2020-07-02 Thread dwgw
Hi
I am trying to stream kafka topic from spark shell but i am getting the
following error. 
I am using *spark 3.0.0/scala 2.12.10* (Java HotSpot(TM) 64-Bit Server VM,
*Java 1.8.0_212*)

*[spark@hdp-dev ~]$ spark-shell --packages
org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0*
Ivy Default Cache set to: /home/spark/.ivy2/cache
The jars for the packages stored in: /home/spark/.ivy2/jars
:: loading settings :: url =
jar:file:/u01/hadoop/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies ::
org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226;1.0
confs: [default]
found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in central
found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 in
central
found org.apache.kafka#kafka-clients;2.4.1 in central
found com.github.luben#zstd-jni;1.4.4-3 in central
found org.lz4#lz4-java;1.7.1 in central
found org.xerial.snappy#snappy-java;1.1.7.5 in central
found org.slf4j#slf4j-api;1.7.30 in central
found org.spark-project.spark#unused;1.0.0 in central
found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 502ms :: artifacts dl 10ms
:: modules in use:
com.github.luben#zstd-jni;1.4.4-3 from central in [default]
org.apache.commons#commons-pool2;2.6.2 from central in [default]
org.apache.kafka#kafka-clients;2.4.1 from central in [default]
org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 from central in
[default]
org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 from
central in [default]
org.lz4#lz4-java;1.7.1 from central in [default]
org.slf4j#slf4j-api;1.7.30 from central in [default]
org.spark-project.spark#unused;1.0.0 from central in [default]
org.xerial.snappy#snappy-java;1.1.7.5 from central in [default]
   
-
|  |modules||   artifacts  
|
|   conf   | number| search|dwnlded|evicted||
number|dwnlded|
   
-
|  default |   9   |   0   |   0   |   0   ||   9   |   0  
|
   
-
:: retrieving ::
org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226
confs: [default]
0 artifacts copied, 9 already retrieved (0kB/13ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
setLogLevel(newLevel).
Spark context Web UI available at http://hdp-dev.infodetics.com:4040
Spark context available as 'sc' (master = yarn, app id =
application_1593620640299_0015).
Spark session available as 'spark'.
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.0.0
  /_/
 
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java
1.8.0_212)
Type in expressions to have them evaluated.
Type :help for more information.


scala> val df = spark.
 | readStream.
 | format("kafka").
 | option("kafka.bootstrap.servers", "XXX").
 | option("subscribe", "XXX").
 | option("kafka.sasl.mechanisms", "XXX").
 | option("kafka.security.protocol", "XXX").
 | option("kafka.sasl.username","XXX").
 | option("kafka.sasl.password", "XXX").
 | option("startingOffsets", "earliest").
 | load
java.lang.AbstractMethodError: Method
org/apache/spark/sql/kafka010/KafkaSourceProvider.inferSchema(Lorg/apache/spark/sql/util/CaseInsensitiveStringMap;)Lorg/apache/spark/sql/types/StructType;
is abstract
  at
org.apache.spark.sql.kafka010.KafkaSourceProvider.inferSchema(KafkaSourceProvider.scala)
  at
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:81)
  at
org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:215)
  ... 57 elided

Looking forward for a response.




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark streaming with Kafka

2020-07-02 Thread dwgw
HiI am trying to stream kafka topic from spark shell but i am getting the
following error. I am using *spark 3.0.0/scala 2.12.10* (Java HotSpot(TM)
64-Bit Server VM, *Java 1.8.0_212*)*[spark@hdp-dev ~]$ spark-shell
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0*Ivy Default
Cache set to: /home/spark/.ivy2/cacheThe jars for the packages stored in:
/home/spark/.ivy2/jars:: loading settings :: url =
jar:file:/u01/hadoop/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xmlorg.apache.spark#spark-sql-kafka-0-10_2.12
added as a dependency:: resolving dependencies ::
org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226;1.0   

confs: [default]found
org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in centralfound
org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 in central   
found org.apache.kafka#kafka-clients;2.4.1 in centralfound
com.github.luben#zstd-jni;1.4.4-3 in centralfound
org.lz4#lz4-java;1.7.1 in centralfound
org.xerial.snappy#snappy-java;1.1.7.5 in centralfound
org.slf4j#slf4j-api;1.7.30 in centralfound
org.spark-project.spark#unused;1.0.0 in centralfound
org.apache.commons#commons-pool2;2.6.2 in central:: resolution report ::
resolve 502ms :: artifacts dl 10ms:: modules in use:   
com.github.luben#zstd-jni;1.4.4-3 from central in [default]   
org.apache.commons#commons-pool2;2.6.2 from central in [default]   
org.apache.kafka#kafka-clients;2.4.1 from central in [default]   
org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 from central in [default]  
 
org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 from central in
[default]org.lz4#lz4-java;1.7.1 from central in [default]   
org.slf4j#slf4j-api;1.7.30 from central in [default]   
org.spark-project.spark#unused;1.0.0 from central in [default]   
org.xerial.snappy#snappy-java;1.1.7.5 from central in [default]   
-   
|  |modules||   artifacts   |   
|   conf   | number| search|dwnlded|evicted|| number|dwnlded|   
-   
|  default |   9   |   0   |   0   |   0   ||   9   |   0   |   
-::
retrieving ::
org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226   
confs: [default]0 artifacts copied, 9 already retrieved
(0kB/13ms)Setting default log level to "WARN".To adjust logging level use
sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).Spark
context Web UI available at http://hdp-dev.infodetics.com:4040Spark context
available as 'sc' (master = yarn, app id =
application_1593620640299_0015).Spark session available as 'spark'.Welcome
to    __ / __/__  ___ _/ /___\ \/ _ \/ _ `/
__/  '_/   /___/ .__/\_,_/_/ /_/\_\   version 3.0.0  /_/ Using
Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java
1.8.0_212)Type in expressions to have them evaluated.Type :help for more
information.scala> val df = spark. | readStream. | format("kafka").
| option("kafka.bootstrap.servers", "XXX"). | option("subscribe",
"XXX"). | option("kafka.sasl.mechanisms", "XXX"). |
option("kafka.security.protocol", "XXX"). |
option("kafka.sasl.username","XXX"). | option("kafka.sasl.password",
"XXX"). | option("startingOffsets", "earliest"). |
loadjava.lang.AbstractMethodError: Method
org/apache/spark/sql/kafka010/KafkaSourceProvider.inferSchema(Lorg/apache/spark/sql/util/CaseInsensitiveStringMap;)Lorg/apache/spark/sql/types/StructType;
is abstract  at
org.apache.spark.sql.kafka010.KafkaSourceProvider.inferSchema(KafkaSourceProvider.scala)
 
at
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:81)
 
at
org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:215)
 
... 57 elidedLooking forward for a response.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

How does Spark Streaming handle late data?

2020-07-02 Thread lafeier








Hi, AllI am using Spark Streaming for real-time data, but the data is delayed.My batch time is set to 15 minutes and then Spark steaming trigger calculation at 15 minutes,30 minutes,45 minutes and 60 minutes, but my data delay is 5 minutes, what should I do?Can spark be calculated at 20,35,50,05 minutes?If not, how do these issues be handled in Spark steaming?








 








lafeier






812747...@qq.com




















签名由
网易邮箱大师
定制

 




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Running Apache Spark Streaming on the GraalVM Native Image

2020-07-01 Thread Pasha Finkelshteyn
Hi Ivo, 

I believe there's absolutely no way that Spark will work on GraalVM
Native Image because Spark generates code and loads classes in runtime,
while GraalVM Native Image works only in closed world and has no any way
to load classes which are not present in classpath at compie time.


On 20/07/01 09:56AM, ivo.kn...@t-online.de wrote:
> Hi guys,
>  
> so I want to get Apache Spark to run on the GraalVM Native Image in a 
> simple single-node streaming application, but I get the following error, 
> when trying to build the native image: (check attached file)
>  
> And as I researched online, there seems to be no successful combination of 
> Spark and GraalVM Native Image. Did anyone ever succeed and how?
>  
> Best regards,
>  
> Ivo
>  
> 


> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-- 
Regards,
Pasha

Big Data Tools @ JetBrains


signature.asc
Description: PGP signature


Running Apache Spark Streaming on the GraalVM Native Image

2020-07-01 Thread ivo.kn...@t-online.de
Hi guys,
 
so I want to get Apache Spark to run on the GraalVM Native Image in a 
simple single-node streaming application, but I get the following error, 
when trying to build the native image: (check attached file)
 
And as I researched online, there seems to be no successful combination of 
Spark and GraalVM Native Image. Did anyone ever succeed and how?
 
Best regards,
 
Ivo
 


Spark GraalVM Native Image Error
Description: Binary data

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

High Availability for spark streaming application running in kubernetes

2020-06-24 Thread Shenson Joseph
Hello,

I have a spark streaming application running in kubernetes and we use spark
operator to submit spark jobs. Any suggestion on

1. How to handle high availability for spark streaming applications.
2. What would be the best approach to handle high availability of
checkpoint data if we don't use HDFS?

Thanks
Shenson


[Spark Streaming] predicate pushdown in custom connector source.

2020-06-23 Thread Rahul Kumar


I'm trying to implement structured spark streaming source for a custom
connector. I'm wondering if it is possible to do predicate pushdown in the
streaming source? I'm aware this may be something native to the datastore in
question. However, I would really appreciate if someone can redirect me to
an existing connector, which does it. 

Thanks,
Rahul



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Apache Spark][Streaming Job][Checkpoint]Spark job failed on Checkpoint recovery with Batch not found error

2020-05-28 Thread taylorwu
Hi,

We have a Spark 2.4 job failed on Checkpoint recovery every few hours with
the following errors (from the Driver Log):

driver spark-kubernetes-driver ERROR 20:38:51 ERROR MicroBatchExecution:
Query impressionUpdate [id = 54614900-4145-4d60-8156-9746ffc13d1f, runId =
3637c2f3-49b6-40c2-b6d0-7edb28361c5d] terminated with error
java.lang.IllegalStateException: batch 946 doesn't exist
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply$mcV$sp(MicroBatchExecution.scala:406)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
at
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:381)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:557)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)

And the executor logs show this error:

 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM

How should I fix this?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming Memory

2020-05-17 Thread Ali Gouta
The spark UI is misleading in spark 2.4.4. I moved to spark 2.4.5 and it
fixed it. Now, your problem should be somewhere else. Probably related to
memory consumption but not the one you see in the UI.

Best regards,
Ali Gouta.

On Sun, May 17, 2020 at 7:36 PM András Kolbert 
wrote:

> Hi,
>
> I have a streaming job (Spark 2.4.4) in which the memory usage keeps
> increasing over time.
>
> Periodically (20-25) mins the executors fall over
> (org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
> location for shuffle 6987) due to out of memory. In the UI, I can see that
> the memory keeps increasing batch by batch, although I do not keep more
> data in memory (I keep unpersisting, checkpointing and caching new data
> frames though), the storage tabs shows only the expected 4 objects overtime.
>
> I wish I just missed a parameter in the spark configuration (like garbage
> collection, reference tracking, etc) that would solve my issue. I have seen
> a few JIRA tickets around memory leak (SPARK-19644
> , SPARK-29055
> , SPARK-29321
> ) it might be the same
> issue?
>
>  ("spark.cleaner.referenceTracking.cleanCheckpoints", "true"),
>  ('spark.cleaner.periodicGC.interval', '1min'),
>  ('spark.cleaner.referenceTracking','true'),
>  ('spark.cleaner.referenceTracking.blocking.shuffle','true'),
>  ('spark.sql.streaming.minBatchesToRetain', '2'),
>  ('spark.sql.streaming.maxBatchesToRetainInMemory', '5'),
>  ('spark.ui.retainedJobs','50' ),
>  ('spark.ui.retainedStages','50'),
>  ('spark.ui.retainedTasks','500'),
>  ('spark.worker.ui.retainedExecutors','50'),
>  ('spark.worker.ui.retainedDrivers','50'),
>  ('spark.sql.ui.retainedExecutions','50'),
>  ('spark.streaming.ui.retainedBatches','1440'),
>  ('spark.executor.JavaOptions','-XX:+UseG1GC -verbose:gc
> -XX:+PrintGCDetails -XX:+PrintGCTimeStamps')
>
> I've tried lowering the spark.streaming.ui.retainedBatches to 8, did not
> help.
>
> The application works fine apart from the fact that the processing
> some batches take longer (when the executors fall over).
>
> [image: image.png]
> [image: image.png]
>
>
> Any ideas?
>
> I've attached my code.
>
>
> Thanks,
> Andras
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Spark Streaming Memory

2020-05-17 Thread András Kolbert
Hi,

I have a streaming job (Spark 2.4.4) in which the memory usage keeps
increasing over time.

Periodically (20-25) mins the executors fall over
(org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 6987) due to out of memory. In the UI, I can see that
the memory keeps increasing batch by batch, although I do not keep more
data in memory (I keep unpersisting, checkpointing and caching new data
frames though), the storage tabs shows only the expected 4 objects overtime.

I wish I just missed a parameter in the spark configuration (like garbage
collection, reference tracking, etc) that would solve my issue. I have seen
a few JIRA tickets around memory leak (SPARK-19644
, SPARK-29055
, SPARK-29321
) it might be the same
issue?

 ("spark.cleaner.referenceTracking.cleanCheckpoints", "true"),
 ('spark.cleaner.periodicGC.interval', '1min'),
 ('spark.cleaner.referenceTracking','true'),
 ('spark.cleaner.referenceTracking.blocking.shuffle','true'),
 ('spark.sql.streaming.minBatchesToRetain', '2'),
 ('spark.sql.streaming.maxBatchesToRetainInMemory', '5'),
 ('spark.ui.retainedJobs','50' ),
 ('spark.ui.retainedStages','50'),
 ('spark.ui.retainedTasks','500'),
 ('spark.worker.ui.retainedExecutors','50'),
 ('spark.worker.ui.retainedDrivers','50'),
 ('spark.sql.ui.retainedExecutions','50'),
 ('spark.streaming.ui.retainedBatches','1440'),
 ('spark.executor.JavaOptions','-XX:+UseG1GC -verbose:gc
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps')

I've tried lowering the spark.streaming.ui.retainedBatches to 8, did not
help.

The application works fine apart from the fact that the processing
some batches take longer (when the executors fall over).

[image: image.png]
[image: image.png]


Any ideas?

I've attached my code.


Thanks,
Andras
import sys
from datetime import datetime, timedelta
import numpy as np
import time
from pathlib import Path
import json
from subprocess import PIPE, run
import pandas as pd
import pyarrow
import gc

from pyspark import SparkContext
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import ArrayType, IntegerType, DoubleType, StringType, 
BooleanType
from pyspark.sql.functions import col, row_number, udf, collect_list, udf, 
array, struct, lit, log, exp
from pyspark.sql import DataFrame, Window

code_version = "03"
environment = "PREPRD"

def_config = spark.sparkContext._conf.getAll()
conf = spark.sparkContext._conf.setAll(
[('spark.app.name', f'application_Streaming_{environment}_v{code_version}'),
 ('spark.executor.memory', '3500M'),
 ('spark.executor.cores', '2'),
 ('spark.cores.max', '4'),
 ('spark.driver.memory', '8g'),
 ('archives', 
'hdfs://node-master:9000/data/application/deployment/env/application_scoring_v1.tar.gz#application_scoring_v1'),
 ("spark.cleaner.referenceTracking.cleanCheckpoints", "true"),
 ('spark.cleaner.periodicGC.interval', '1min'),
 ('spark.cleaner.referenceTracking','true'),
 ('spark.cleaner.referenceTracking.blocking.shuffle','true'),
 ('spark.sql.streaming.minBatchesToRetain', '2'),
 ('spark.sql.streaming.maxBatchesToRetainInMemory', '5'),
 ('spark.ui.retainedJobs','50' ),
 ('spark.ui.retainedStages','50'),
 ('spark.ui.retainedTasks','500'),
 ('spark.worker.ui.retainedExecutors','50'),
 ('spark.worker.ui.retainedDrivers','50'),
 ('spark.sql.ui.retainedExecutions','50'),
 ('spark.streaming.ui.retainedBatches','1440'),
 ('spark.executor.JavaOptions','-XX:+UseG1GC -verbose:gc 
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps')
 ])

spark.sparkContext.stop()

os.environ['LOG_DIRS'] = '/application/logs/application_logs'

spark = SparkSession \
.builder \
.config(conf=conf) \
.getOrCreate()
sc = spark.sparkContext
spark.conf.set("spark.sql.execution.arrow.enabled", "false")
spark.conf.set("spark.sql.execution.arrow.fallback.enabled", "true")
spark.conf.set("spark.streaming.stopGracefullyOnShutdown", "true")

sc.addFile('hdfs:///data/application/deployment/shared_utils.tar')
sc.addPyFile('hdfs:///data/application/deployment/application_payload.py')
sc.addPyFile('hdfs:///data/application/deployment/spark_logging.py')
sc.setLogLevel("ERROR")

from hdfs import InsecureClient
from application_payload import get_channel_game_info, get_application_output, 
get_predictions_pure
from shared_utils.KafkaUtils.kafka_utils import KafkaConnect
import spark_logging as logging
import spark_misc as sm

logger = logging.getLogger("driver", 
logfile=f"{sc._jsc.sc().applicationId()}.log")
init_handler = logging.StreamHandler()
init_handler.setLevel(logging.DEBUG)

Re: [spark streaming] checkpoint location feature for batch processing

2020-05-03 Thread Jungtaek Lim
Replied inline:

On Sun, May 3, 2020 at 6:25 PM Magnus Nilsson  wrote:

> Thank you, so that would mean spark gets the current latest offset(s) when
> the trigger fires and then process all available messages in the topic upto
> and including that offset as long as maxOffsetsPerTrigger is the default of
> None (or large enought to handle all available messages).
>

Yes it starts from the offset of latest batch. `maxOffsetsPerTrigger` will
be ignored starting from Spark 3.0.0, which means for Spark 2.x it's still
affecting even Trigger.Once is used I guess.


>
> I think the word micro-batch confused me (more like mega-batch in some
> cases). It makes sense though, this makes Trigger.Once a fixed interval
> trigger that's only fired once and not repeatedly.
>

"micro" is relative - though Spark by default processes all available
inputs per batch, in most cases you'll want to make the batch size
(interval) as small as possible, as it defines the latency of the output.
Trigger.Once is an unusual case in streaming workload - that's more alike
continuous execution of "batch". I refer "continuous" as picking up latest
context which is the characteristic of streaming query, hence hybrid one.


>
>
> On Sun, May 3, 2020 at 3:20 AM Jungtaek Lim 
> wrote:
>
>> If I understand correctly, Trigger.once executes only one micro-batch and
>> terminates, that's all. Your understanding of structured streaming applies
>> there as well.
>>
>> It's like a hybrid approach as bringing incremental processing from
>> micro-batch but having processing interval as batch. That said, while it
>> enables to get both sides of benefits, it's basically structured streaming,
>> inheriting all the limitations on the structured streaming, compared to the
>> batch query.
>>
>> Spark 3.0.0 will bring some change on Trigger.once (SPARK-30669 [1]) -
>> Trigger.once will "ignore" the read limit per micro-batch on data source
>> (like maxOffsetsPerTrigger) and process all available input as possible.
>> (Data sources should migrate to the new API to take effect, but works for
>> built-in data sources like file and Kafka.)
>>
>> 1. https://issues.apache.org/jira/browse/SPARK-30669
>>
>> 2020년 5월 2일 (토) 오후 5:35, Magnus Nilsson 님이 작성:
>>
>>> I've always had a question about Trigger.Once that I never got around to
>>> ask or test for myself. If you have a 24/7 stream to a Kafka topic.
>>>
>>> Will Trigger.Once get the last offset(s) when it starts and then quit
>>> once it hits this offset(s) or will the job run until no new messages is
>>> added to the topic for a particular amount of time?
>>>
>>> br,
>>>
>>> Magnus
>>>
>>> On Sat, May 2, 2020 at 1:22 AM Burak Yavuz  wrote:
>>>
>>>> Hi Rishi,
>>>>
>>>> That is exactly why Trigger.Once was created for Structured Streaming.
>>>> The way we look at streaming is that it doesn't have to be always real
>>>> time, or 24-7 always on. We see streaming as a workflow that you have to
>>>> repeat indefinitely. See this blog post for more details!
>>>>
>>>> https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html
>>>>
>>>> Best,
>>>> Burak
>>>>
>>>> On Fri, May 1, 2020 at 2:55 PM Rishi Shah 
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I recently started playing with spark streaming, and checkpoint
>>>>> location feature looks very promising. I wonder if anyone has an opinion
>>>>> about using spark streaming with checkpoint location option as a slow 
>>>>> batch
>>>>> processing solution. What would be the pros and cons of utilizing 
>>>>> streaming
>>>>> with checkpoint location feature to achieve fault tolerance in batch
>>>>> processing application?
>>>>>
>>>>> --
>>>>> Regards,
>>>>>
>>>>> Rishi Shah
>>>>>
>>>>


Re: [spark streaming] checkpoint location feature for batch processing

2020-05-03 Thread Magnus Nilsson
Thank you, so that would mean spark gets the current latest offset(s) when
the trigger fires and then process all available messages in the topic upto
and including that offset as long as maxOffsetsPerTrigger is the default of
None (or large enought to handle all available messages).

I think the word micro-batch confused me (more like mega-batch in some
cases). It makes sense though, this makes Trigger.Once a fixed interval
trigger that's only fired once and not repeatedly.


On Sun, May 3, 2020 at 3:20 AM Jungtaek Lim 
wrote:

> If I understand correctly, Trigger.once executes only one micro-batch and
> terminates, that's all. Your understanding of structured streaming applies
> there as well.
>
> It's like a hybrid approach as bringing incremental processing from
> micro-batch but having processing interval as batch. That said, while it
> enables to get both sides of benefits, it's basically structured streaming,
> inheriting all the limitations on the structured streaming, compared to the
> batch query.
>
> Spark 3.0.0 will bring some change on Trigger.once (SPARK-30669 [1]) -
> Trigger.once will "ignore" the read limit per micro-batch on data source
> (like maxOffsetsPerTrigger) and process all available input as possible.
> (Data sources should migrate to the new API to take effect, but works for
> built-in data sources like file and Kafka.)
>
> 1. https://issues.apache.org/jira/browse/SPARK-30669
>
> 2020년 5월 2일 (토) 오후 5:35, Magnus Nilsson 님이 작성:
>
>> I've always had a question about Trigger.Once that I never got around to
>> ask or test for myself. If you have a 24/7 stream to a Kafka topic.
>>
>> Will Trigger.Once get the last offset(s) when it starts and then quit
>> once it hits this offset(s) or will the job run until no new messages is
>> added to the topic for a particular amount of time?
>>
>> br,
>>
>> Magnus
>>
>> On Sat, May 2, 2020 at 1:22 AM Burak Yavuz  wrote:
>>
>>> Hi Rishi,
>>>
>>> That is exactly why Trigger.Once was created for Structured Streaming.
>>> The way we look at streaming is that it doesn't have to be always real
>>> time, or 24-7 always on. We see streaming as a workflow that you have to
>>> repeat indefinitely. See this blog post for more details!
>>>
>>> https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html
>>>
>>> Best,
>>> Burak
>>>
>>> On Fri, May 1, 2020 at 2:55 PM Rishi Shah 
>>> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I recently started playing with spark streaming, and checkpoint
>>>> location feature looks very promising. I wonder if anyone has an opinion
>>>> about using spark streaming with checkpoint location option as a slow batch
>>>> processing solution. What would be the pros and cons of utilizing streaming
>>>> with checkpoint location feature to achieve fault tolerance in batch
>>>> processing application?
>>>>
>>>> --
>>>> Regards,
>>>>
>>>> Rishi Shah
>>>>
>>>


Re: [spark streaming] checkpoint location feature for batch processing

2020-05-02 Thread Jungtaek Lim
If I understand correctly, Trigger.once executes only one micro-batch and
terminates, that's all. Your understanding of structured streaming applies
there as well.

It's like a hybrid approach as bringing incremental processing from
micro-batch but having processing interval as batch. That said, while it
enables to get both sides of benefits, it's basically structured streaming,
inheriting all the limitations on the structured streaming, compared to the
batch query.

Spark 3.0.0 will bring some change on Trigger.once (SPARK-30669 [1]) -
Trigger.once will "ignore" the read limit per micro-batch on data source
(like maxOffsetsPerTrigger) and process all available input as possible.
(Data sources should migrate to the new API to take effect, but works for
built-in data sources like file and Kafka.)

1. https://issues.apache.org/jira/browse/SPARK-30669

2020년 5월 2일 (토) 오후 5:35, Magnus Nilsson 님이 작성:

> I've always had a question about Trigger.Once that I never got around to
> ask or test for myself. If you have a 24/7 stream to a Kafka topic.
>
> Will Trigger.Once get the last offset(s) when it starts and then quit once
> it hits this offset(s) or will the job run until no new messages is added
> to the topic for a particular amount of time?
>
> br,
>
> Magnus
>
> On Sat, May 2, 2020 at 1:22 AM Burak Yavuz  wrote:
>
>> Hi Rishi,
>>
>> That is exactly why Trigger.Once was created for Structured Streaming.
>> The way we look at streaming is that it doesn't have to be always real
>> time, or 24-7 always on. We see streaming as a workflow that you have to
>> repeat indefinitely. See this blog post for more details!
>>
>> https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html
>>
>> Best,
>> Burak
>>
>> On Fri, May 1, 2020 at 2:55 PM Rishi Shah 
>> wrote:
>>
>>> Hi All,
>>>
>>> I recently started playing with spark streaming, and checkpoint location
>>> feature looks very promising. I wonder if anyone has an opinion about using
>>> spark streaming with checkpoint location option as a slow batch processing
>>> solution. What would be the pros and cons of utilizing streaming with
>>> checkpoint location feature to achieve fault tolerance in batch processing
>>> application?
>>>
>>> --
>>> Regards,
>>>
>>> Rishi Shah
>>>
>>


Re: [spark streaming] checkpoint location feature for batch processing

2020-05-02 Thread Magnus Nilsson
I've always had a question about Trigger.Once that I never got around to
ask or test for myself. If you have a 24/7 stream to a Kafka topic.

Will Trigger.Once get the last offset(s) when it starts and then quit once
it hits this offset(s) or will the job run until no new messages is added
to the topic for a particular amount of time?

br,

Magnus

On Sat, May 2, 2020 at 1:22 AM Burak Yavuz  wrote:

> Hi Rishi,
>
> That is exactly why Trigger.Once was created for Structured Streaming. The
> way we look at streaming is that it doesn't have to be always real time, or
> 24-7 always on. We see streaming as a workflow that you have to repeat
> indefinitely. See this blog post for more details!
>
> https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html
>
> Best,
> Burak
>
> On Fri, May 1, 2020 at 2:55 PM Rishi Shah 
> wrote:
>
>> Hi All,
>>
>> I recently started playing with spark streaming, and checkpoint location
>> feature looks very promising. I wonder if anyone has an opinion about using
>> spark streaming with checkpoint location option as a slow batch processing
>> solution. What would be the pros and cons of utilizing streaming with
>> checkpoint location feature to achieve fault tolerance in batch processing
>> application?
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>


Re: [spark streaming] checkpoint location feature for batch processing

2020-05-01 Thread Rishi Shah
Thanks Burak! Appreciate it. This makes sense.

How do you suggest we make sure resulting data doesn't produce tiny files?
If we are not on databricks yet and can not leverage delta lake features?
Also checkpointing feature, do you have active blog/article I can take
a look at to try out an example?

On Fri, May 1, 2020 at 7:22 PM Burak Yavuz  wrote:

> Hi Rishi,
>
> That is exactly why Trigger.Once was created for Structured Streaming. The
> way we look at streaming is that it doesn't have to be always real time, or
> 24-7 always on. We see streaming as a workflow that you have to repeat
> indefinitely. See this blog post for more details!
>
> https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html
>
> Best,
> Burak
>
> On Fri, May 1, 2020 at 2:55 PM Rishi Shah 
> wrote:
>
>> Hi All,
>>
>> I recently started playing with spark streaming, and checkpoint location
>> feature looks very promising. I wonder if anyone has an opinion about using
>> spark streaming with checkpoint location option as a slow batch processing
>> solution. What would be the pros and cons of utilizing streaming with
>> checkpoint location feature to achieve fault tolerance in batch processing
>> application?
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>

-- 
Regards,

Rishi Shah


Re: [spark streaming] checkpoint location feature for batch processing

2020-05-01 Thread Burak Yavuz
Hi Rishi,

That is exactly why Trigger.Once was created for Structured Streaming. The
way we look at streaming is that it doesn't have to be always real time, or
24-7 always on. We see streaming as a workflow that you have to repeat
indefinitely. See this blog post for more details!
https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html

Best,
Burak

On Fri, May 1, 2020 at 2:55 PM Rishi Shah  wrote:

> Hi All,
>
> I recently started playing with spark streaming, and checkpoint location
> feature looks very promising. I wonder if anyone has an opinion about using
> spark streaming with checkpoint location option as a slow batch processing
> solution. What would be the pros and cons of utilizing streaming with
> checkpoint location feature to achieve fault tolerance in batch processing
> application?
>
> --
> Regards,
>
> Rishi Shah
>


[spark streaming] checkpoint location feature for batch processing

2020-05-01 Thread Rishi Shah
Hi All,

I recently started playing with spark streaming, and checkpoint location
feature looks very promising. I wonder if anyone has an opinion about using
spark streaming with checkpoint location option as a slow batch processing
solution. What would be the pros and cons of utilizing streaming with
checkpoint location feature to achieve fault tolerance in batch processing
application?

-- 
Regards,

Rishi Shah


Re: Spark Streaming not working

2020-04-14 Thread Gerard Maas
Hi,

Could you share the code that you're using to configure the connection to
the Kafka broker?

This is a bread-and-butter feature. My first thought is that there's
something in your particular setup that prevents this from working.

kind regards, Gerard.

On Fri, Apr 10, 2020 at 7:34 PM Debabrata Ghosh 
wrote:

> Hi,
> I have a spark streaming application where Kafka is producing
> records but unfortunately spark streaming isn't able to consume those.
>
> I am hitting the following error:
>
> 20/04/10 17:28:04 ERROR Executor: Exception in task 0.5 in stage 0.0 (TID 24)
> java.lang.AssertionError: assertion failed: Failed to get records for 
> spark-executor-service-spark-ingestion dice-ingestion 11 0 after polling for 
> 12
>   at scala.Predef$.assert(Predef.scala:170)
>   at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
>   at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
>   at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
>
>
> Would you please be able to help with a resolution.
>
> Thanks,
> Debu
>


Re: Spark Streaming not working

2020-04-14 Thread Gabor Somogyi
Sorry, hit the send accidentally...

The symptom is simple, the broker is not responding in 120 seconds.
That's the reason why Debabrata asked the broker config.

What I can suggest is to check the previous printout which logs the Kafka
consumer settings.
With the mentioned settings you can start a console consumer on the exact
same host where the executor ran...
If that works you can open a Spark jira with driver and executor logs,
otherwise fix the connection issue.

BR,
G


On Tue, Apr 14, 2020 at 1:32 PM Gabor Somogyi 
wrote:

> The symptom is simple, the broker is not responding in 120 seconds.
> That's the reason why Debabrata asked the broker config.
>
> What I can suggest is to check the previous printout which logs the Kafka
> consumer settings.
> With
>
>
> On Tue, Apr 14, 2020 at 11:44 AM ZHANG Wei  wrote:
>
>> Here is the assertion error message format:
>>
>>s"Failed to get records for $groupId $topic $partition $offset after
>> polling for $timeout")
>>
>> You might have to check the kafka service with the error log:
>>
>> > 20/04/10 17:28:04 ERROR Executor: Exception in task 0.5 in stage 0.0
>> (TID 24)
>> > java.lang.AssertionError: assertion failed: Failed to get records for
>> spark-executor-service-spark-ingestion dice-ingestion 11 0 after polling
>> for 12
>>
>> Cheers,
>> -z
>>
>> ____
>> From: Debabrata Ghosh 
>> Sent: Saturday, April 11, 2020 2:25
>> To: user
>> Subject: Re: Spark Streaming not working
>>
>> Any solution please ?
>>
>> On Fri, Apr 10, 2020 at 11:04 PM Debabrata Ghosh > <mailto:mailford...@gmail.com>> wrote:
>> Hi,
>> I have a spark streaming application where Kafka is producing
>> records but unfortunately spark streaming isn't able to consume those.
>>
>> I am hitting the following error:
>>
>> 20/04/10 17:28:04 ERROR Executor: Exception in task 0.5 in stage 0.0 (TID
>> 24)
>> java.lang.AssertionError: assertion failed: Failed to get records for
>> spark-executor-service-spark-ingestion dice-ingestion 11 0 after polling
>> for 12
>> at scala.Predef$.assert(Predef.scala:170)
>> at
>> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
>> at
>> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
>> at
>> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
>>
>> Would you please be able to help with a resolution.
>>
>> Thanks,
>> Debu
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: Spark Streaming not working

2020-04-14 Thread Gabor Somogyi
The symptom is simple, the broker is not responding in 120 seconds.
That's the reason why Debabrata asked the broker config.

What I can suggest is to check the previous printout which logs the Kafka
consumer settings.
With


On Tue, Apr 14, 2020 at 11:44 AM ZHANG Wei  wrote:

> Here is the assertion error message format:
>
>s"Failed to get records for $groupId $topic $partition $offset after
> polling for $timeout")
>
> You might have to check the kafka service with the error log:
>
> > 20/04/10 17:28:04 ERROR Executor: Exception in task 0.5 in stage 0.0
> (TID 24)
> > java.lang.AssertionError: assertion failed: Failed to get records for
> spark-executor-service-spark-ingestion dice-ingestion 11 0 after polling
> for 12
>
> Cheers,
> -z
>
> 
> From: Debabrata Ghosh 
> Sent: Saturday, April 11, 2020 2:25
> To: user
> Subject: Re: Spark Streaming not working
>
> Any solution please ?
>
> On Fri, Apr 10, 2020 at 11:04 PM Debabrata Ghosh  <mailto:mailford...@gmail.com>> wrote:
> Hi,
> I have a spark streaming application where Kafka is producing
> records but unfortunately spark streaming isn't able to consume those.
>
> I am hitting the following error:
>
> 20/04/10 17:28:04 ERROR Executor: Exception in task 0.5 in stage 0.0 (TID
> 24)
> java.lang.AssertionError: assertion failed: Failed to get records for
> spark-executor-service-spark-ingestion dice-ingestion 11 0 after polling
> for 12
> at scala.Predef$.assert(Predef.scala:170)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
>
> Would you please be able to help with a resolution.
>
> Thanks,
> Debu
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Streaming not working

2020-04-14 Thread ZHANG Wei
Here is the assertion error message format:

   s"Failed to get records for $groupId $topic $partition $offset after polling 
for $timeout")

You might have to check the kafka service with the error log:

> 20/04/10 17:28:04 ERROR Executor: Exception in task 0.5 in stage 0.0 (TID 24)
> java.lang.AssertionError: assertion failed: Failed to get records for 
> spark-executor-service-spark-ingestion dice-ingestion 11 0 after polling for 
> 12

Cheers,
-z


From: Debabrata Ghosh 
Sent: Saturday, April 11, 2020 2:25
To: user
Subject: Re: Spark Streaming not working

Any solution please ?

On Fri, Apr 10, 2020 at 11:04 PM Debabrata Ghosh 
mailto:mailford...@gmail.com>> wrote:
Hi,
I have a spark streaming application where Kafka is producing records 
but unfortunately spark streaming isn't able to consume those.

I am hitting the following error:

20/04/10 17:28:04 ERROR Executor: Exception in task 0.5 in stage 0.0 (TID 24)
java.lang.AssertionError: assertion failed: Failed to get records for 
spark-executor-service-spark-ingestion dice-ingestion 11 0 after polling for 
12
at scala.Predef$.assert(Predef.scala:170)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)

Would you please be able to help with a resolution.

Thanks,
Debu

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming not working

2020-04-10 Thread Debabrata Ghosh
Any solution please ?

On Fri, Apr 10, 2020 at 11:04 PM Debabrata Ghosh 
wrote:

> Hi,
> I have a spark streaming application where Kafka is producing
> records but unfortunately spark streaming isn't able to consume those.
>
> I am hitting the following error:
>
> 20/04/10 17:28:04 ERROR Executor: Exception in task 0.5 in stage 0.0 (TID 24)
> java.lang.AssertionError: assertion failed: Failed to get records for 
> spark-executor-service-spark-ingestion dice-ingestion 11 0 after polling for 
> 12
>   at scala.Predef$.assert(Predef.scala:170)
>   at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
>   at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
>   at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
>
>
> Would you please be able to help with a resolution.
>
> Thanks,
> Debu
>


Re: Spark Streaming not working

2020-04-10 Thread Chenguang He
unsubscribe


Re: Spark Streaming not working

2020-04-10 Thread Debabrata Ghosh
Yes the Kafka producer is producing records from the same host - Rechecked
Kafka connection and the connection is there. Came across this URL but
unable to understand it

https://stackoverflow.com/questions/42264669/spark-streaming-assertion-failed-failed-to-get-records-for-spark-executor-a-gro

On Fri, Apr 10, 2020 at 11:14 PM Srinivas V  wrote:

> Check if your broker details are correct, verify if you have network
> connectivity to your client box and Kafka broker server host.
>
> On Fri, Apr 10, 2020 at 11:04 PM Debabrata Ghosh 
> wrote:
>
>> Hi,
>>     I have a spark streaming application where Kafka is producing
>> records but unfortunately spark streaming isn't able to consume those.
>>
>> I am hitting the following error:
>>
>> 20/04/10 17:28:04 ERROR Executor: Exception in task 0.5 in stage 0.0 (TID 24)
>> java.lang.AssertionError: assertion failed: Failed to get records for 
>> spark-executor-service-spark-ingestion dice-ingestion 11 0 after polling for 
>> 12
>>  at scala.Predef$.assert(Predef.scala:170)
>>  at 
>> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
>>  at 
>> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
>>  at 
>> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
>>
>>
>> Would you please be able to help with a resolution.
>>
>> Thanks,
>> Debu
>>
>


Re: Spark Streaming not working

2020-04-10 Thread Srinivas V
Check if your broker details are correct, verify if you have network
connectivity to your client box and Kafka broker server host.

On Fri, Apr 10, 2020 at 11:04 PM Debabrata Ghosh 
wrote:

> Hi,
> I have a spark streaming application where Kafka is producing
> records but unfortunately spark streaming isn't able to consume those.
>
> I am hitting the following error:
>
> 20/04/10 17:28:04 ERROR Executor: Exception in task 0.5 in stage 0.0 (TID 24)
> java.lang.AssertionError: assertion failed: Failed to get records for 
> spark-executor-service-spark-ingestion dice-ingestion 11 0 after polling for 
> 12
>   at scala.Predef$.assert(Predef.scala:170)
>   at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
>   at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
>   at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
>
>
> Would you please be able to help with a resolution.
>
> Thanks,
> Debu
>


Spark Streaming not working

2020-04-10 Thread Debabrata Ghosh
Hi,
I have a spark streaming application where Kafka is producing
records but unfortunately spark streaming isn't able to consume those.

I am hitting the following error:

20/04/10 17:28:04 ERROR Executor: Exception in task 0.5 in stage 0.0 (TID 24)
java.lang.AssertionError: assertion failed: Failed to get records for
spark-executor-service-spark-ingestion dice-ingestion 11 0 after
polling for 12
at scala.Predef$.assert(Predef.scala:170)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)


Would you please be able to help with a resolution.

Thanks,
Debu


Re: Spark Streaming on Compact Kafka topic - consumers 1 message per partition per batch

2020-04-08 Thread Hrishikesh Mishra
It seems, I found the issue. The actual problem is something related to
back pressure. When I am adding these config
*spark.streaming.kafka.maxRatePerPartition* or
*spark.streaming.backpressure.initialRate* (the of these configs are 100).
After that it starts consuming one message per partition per batch. Not why
it's happening.


On Thu, Apr 2, 2020 at 8:48 AM Waleed Fateem 
wrote:

> Well this is interesting. Not sure if this is the expected behavior. The
> log messages you have referenced are actually printed out by the Kafka
> Consumer itself (org.apache.kafka.clients.consumer.internals.Fetcher).
>
> That log message belongs to a new feature added starting with Kafka 1.1:
> https://issues.apache.org/jira/browse/KAFKA-6397
>
> I'm assuming then that you're using Spark 2.4?
>
> From Kafka's perspective, when you do a describe on your
> demandIngestion.SLTarget topic, does that look okay? All partitions are
> available with a valid leader.
>
> The other thing I'm curious about, after you
> enabled spark.streaming.kafka.allowNonConsecutiveOffsets, did you try going
> back to the older group.id and do you see the same behavior? Was there a
> reason you chose to start reading again from the beginning by using a new
> consumer group rather then sticking to the same consumer group?
>
> In your application, are you manually committing offsets to Kafka?
>
> Regards,
>
> Waleed
>
> On Wed, Apr 1, 2020 at 1:31 AM Hrishikesh Mishra 
> wrote:
>
>> Hi
>>
>> Our Spark streaming job was working fine as expected (the number of
>> events to process in a batch). But due to some reasons, we added compaction
>> on Kafka topic and restarted the job. But after restart it was failing for
>> below reason:
>>
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 16 in stage 2.0 failed 4 times, most recent failure: Lost task 16.3 in
>> stage 2.0 (TID 231, 10.34.29.38, executor 4):
>> java.lang.IllegalArgumentException: requirement failed: Got wrong record
>> for spark-executor-pc-nfr-loop-31-march-2020-4 demandIngestion.SLTarget-39
>> even after seeking to offset 106847 got offset 199066 instead. If this is a
>> compacted topic, consider enabling
>> spark.streaming.kafka.allowNonConsecutiveOffsets
>>   at scala.Predef$.require(Predef.scala:224)
>>   at
>> org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:146)
>>
>>
>>
>> So, I added spark.streaming.kafka.allowNonConsecutiveOffsets: true  in
>> spark config and I changed the group name to consume from beginning. Now
>> the problem is, it reading only one message from per partition. So if a
>> topic has 50 partitions then its reading 50 message per batch (batch
>> duration is 5 sec).
>>
>> The topic is 1M records and consumer has huge lag.
>>
>>
>> Driver log which fetches 1 message per partition.
>>
>> 20/03/31 18:25:55 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211951.
>> 20/03/31 18:26:00 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211952.
>> 20/03/31 18:26:05 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211953.
>> 20/03/31 18:26:10 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211954.
>> 20/03/31 18:26:15 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211955.
>> 20/03/31 18:26:20 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211956.
>> 20/03/31 18:26:25 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211957.
>> 20/03/31 18:26:30 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211958.
>> 20/03/31 18:26:35 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211959.
>> 20/03/31 18:26:40 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211960.
>> 20/03/31 18:26:45 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211961.
>> 20/03/31 18:26:50 INFO Fetcher: [groupId=pc-nfr-loop-

Re: Spark Streaming on Compact Kafka topic - consumers 1 message per partition per batch

2020-04-01 Thread Waleed Fateem
Well this is interesting. Not sure if this is the expected behavior. The
log messages you have referenced are actually printed out by the Kafka
Consumer itself (org.apache.kafka.clients.consumer.internals.Fetcher).

That log message belongs to a new feature added starting with Kafka 1.1:
https://issues.apache.org/jira/browse/KAFKA-6397

I'm assuming then that you're using Spark 2.4?

>From Kafka's perspective, when you do a describe on your
demandIngestion.SLTarget topic, does that look okay? All partitions are
available with a valid leader.

The other thing I'm curious about, after you
enabled spark.streaming.kafka.allowNonConsecutiveOffsets, did you try going
back to the older group.id and do you see the same behavior? Was there a
reason you chose to start reading again from the beginning by using a new
consumer group rather then sticking to the same consumer group?

In your application, are you manually committing offsets to Kafka?

Regards,

Waleed

On Wed, Apr 1, 2020 at 1:31 AM Hrishikesh Mishra 
wrote:

> Hi
>
> Our Spark streaming job was working fine as expected (the number of events
> to process in a batch). But due to some reasons, we added compaction on
> Kafka topic and restarted the job. But after restart it was failing for
> below reason:
>
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 16
> in stage 2.0 failed 4 times, most recent failure: Lost task 16.3 in stage
> 2.0 (TID 231, 10.34.29.38, executor 4): java.lang.IllegalArgumentException:
> requirement failed: Got wrong record for
> spark-executor-pc-nfr-loop-31-march-2020-4 demandIngestion.SLTarget-39 even
> after seeking to offset 106847 got offset 199066 instead. If this is a
> compacted topic, consider enabling
> spark.streaming.kafka.allowNonConsecutiveOffsets
>   at scala.Predef$.require(Predef.scala:224)
>   at
> org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:146)
>
>
>
> So, I added spark.streaming.kafka.allowNonConsecutiveOffsets: true  in
> spark config and I changed the group name to consume from beginning. Now
> the problem is, it reading only one message from per partition. So if a
> topic has 50 partitions then its reading 50 message per batch (batch
> duration is 5 sec).
>
> The topic is 1M records and consumer has huge lag.
>
>
> Driver log which fetches 1 message per partition.
>
> 20/03/31 18:25:55 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211951.
> 20/03/31 18:26:00 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211952.
> 20/03/31 18:26:05 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211953.
> 20/03/31 18:26:10 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211954.
> 20/03/31 18:26:15 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211955.
> 20/03/31 18:26:20 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211956.
> 20/03/31 18:26:25 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211957.
> 20/03/31 18:26:30 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211958.
> 20/03/31 18:26:35 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211959.
> 20/03/31 18:26:40 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211960.
> 20/03/31 18:26:45 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211961.
> 20/03/31 18:26:50 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211962.
> 20/03/31 18:26:55 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211963.
> 20/03/31 18:27:00 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211964.
> 20/03/31 18:27:05 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211965.
> 20/03/31 18:27:10 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211966.
> 20/03

Spark Streaming on Compact Kafka topic - consumers 1 message per partition per batch

2020-04-01 Thread Hrishikesh Mishra
Hi

Our Spark streaming job was working fine as expected (the number of events
to process in a batch). But due to some reasons, we added compaction on
Kafka topic and restarted the job. But after restart it was failing for
below reason:


org.apache.spark.SparkException: Job aborted due to stage failure: Task 16
in stage 2.0 failed 4 times, most recent failure: Lost task 16.3 in stage
2.0 (TID 231, 10.34.29.38, executor 4): java.lang.IllegalArgumentException:
requirement failed: Got wrong record for
spark-executor-pc-nfr-loop-31-march-2020-4 demandIngestion.SLTarget-39 even
after seeking to offset 106847 got offset 199066 instead. If this is a
compacted topic, consider enabling
spark.streaming.kafka.allowNonConsecutiveOffsets
  at scala.Predef$.require(Predef.scala:224)
  at
org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:146)



So, I added spark.streaming.kafka.allowNonConsecutiveOffsets: true  in
spark config and I changed the group name to consume from beginning. Now
the problem is, it reading only one message from per partition. So if a
topic has 50 partitions then its reading 50 message per batch (batch
duration is 5 sec).

The topic is 1M records and consumer has huge lag.


Driver log which fetches 1 message per partition.

20/03/31 18:25:55 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
Resetting offset for partition demandIngestion.SLTarget-45 to offset 211951.
20/03/31 18:26:00 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
Resetting offset for partition demandIngestion.SLTarget-45 to offset 211952.
20/03/31 18:26:05 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
Resetting offset for partition demandIngestion.SLTarget-45 to offset 211953.
20/03/31 18:26:10 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
Resetting offset for partition demandIngestion.SLTarget-45 to offset 211954.
20/03/31 18:26:15 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
Resetting offset for partition demandIngestion.SLTarget-45 to offset 211955.
20/03/31 18:26:20 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
Resetting offset for partition demandIngestion.SLTarget-45 to offset 211956.
20/03/31 18:26:25 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
Resetting offset for partition demandIngestion.SLTarget-45 to offset 211957.
20/03/31 18:26:30 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
Resetting offset for partition demandIngestion.SLTarget-45 to offset 211958.
20/03/31 18:26:35 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
Resetting offset for partition demandIngestion.SLTarget-45 to offset 211959.
20/03/31 18:26:40 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
Resetting offset for partition demandIngestion.SLTarget-45 to offset 211960.
20/03/31 18:26:45 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
Resetting offset for partition demandIngestion.SLTarget-45 to offset 211961.
20/03/31 18:26:50 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
Resetting offset for partition demandIngestion.SLTarget-45 to offset 211962.
20/03/31 18:26:55 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
Resetting offset for partition demandIngestion.SLTarget-45 to offset 211963.
20/03/31 18:27:00 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
Resetting offset for partition demandIngestion.SLTarget-45 to offset 211964.
20/03/31 18:27:05 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
Resetting offset for partition demandIngestion.SLTarget-45 to offset 211965.
20/03/31 18:27:10 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
Resetting offset for partition demandIngestion.SLTarget-45 to offset 211966.
20/03/31 18:27:15 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
Resetting offset for partition demandIngestion.SLTarget-45 to offset 211967.
20/03/31 18:27:20 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
Resetting offset for partition demandIngestion.SLTarget-45 to offset 211968.
20/03/31 18:27:25 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
Resetting offset for partition demandIngestion.SLTarget-45 to offset 211969.
20/03/31 18:27:30 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
Resetting offset for partition demandIngestion.SLTarget-45  to offset
211970.



Spark Config (batch.duration: 5, using Spark Stream) :

  spark.shuffle.service.enabled: "true"

  spark.streaming.backpressure.enabled: "true"

  spark.streaming.concurrentJobs: "1"

  spark.executor.extraJavaOptions: "-XX:+UseConcMarkSweepGC"

  spark.streaming.backpressure.pid.minRate: 1500

  spark.streaming.backpressure.initialRate: 100

  spark.streaming.kafka.allowNonConsecutiveOffsets: true



Is there any issue in my configuration or something special required with
compact Kafka topic which I'm missing?




Regards
Hrishi


Re: Spark Streaming Code

2020-03-28 Thread Jungtaek Lim
To get any meaningful answers you may want to provide the
information/context as much as possible. e.g. Spark version, which
behavior/output was expected (and why you think) and how it behaves
actually.

On Sun, Mar 29, 2020 at 3:37 AM Siva Samraj  wrote:

> Hi Team,
>
> Need help on windowing & watermark concept.  This code is not working as
> expected.
>
> package com.jiomoney.streaming
>
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.streaming.ProcessingTime
>
> object SlingStreaming {
>   def main(args: Array[String]): Unit = {
> val spark = SparkSession
>   .builder()
>   .master("local[*]")
>   .appName("Coupons_ViewingNow")
>   .getOrCreate()
>
> import spark.implicits._
>
> val checkpoint_path = "/opt/checkpoints/"
>
> val ks = spark
>   .readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "localhost:9092")
>   .option("subscribe", "test")
>   .option("startingOffsets", "latest")
>   .option("failOnDataLoss", "false")
>   .option("kafka.replica.fetch.max.bytes", "16777216")
>   .load()
>
> val dfDeviceid = ks
>   .withColumn("val", ($"value").cast("string"))
>   .withColumn("count1", get_json_object(($"val"), "$.a"))
>   .withColumn("deviceId", get_json_object(($"val"), "$.b"))
>   .withColumn("timestamp", current_timestamp())
>
>
> val final_ids = dfDeviceid
>   .withColumn("processing_time", current_timestamp())
>   .withWatermark("processing_time","1 minutes")
>   .groupBy(window($"processing_time", "10 seconds"), $"deviceId")
>   .agg(sum($"count1") as "total")
>
> val t = final_ids
>   .select(to_json(struct($"*")) as "value")
>   .writeStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "localhost:9092")
>   .option("topic", "sub_topic")
>   .option("checkpointLocation", checkpoint_path)
>   .outputMode("append")
>   .trigger(ProcessingTime("1 seconds"))
>   .start()
>
> t.awaitTermination()
>
>   }
>
> }
>
>
> Thanks
>
>


Spark Streaming Code

2020-03-28 Thread Siva Samraj
Hi Team,

Need help on windowing & watermark concept.  This code is not working as
expected.

package com.jiomoney.streaming

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.ProcessingTime

object SlingStreaming {
  def main(args: Array[String]): Unit = {
val spark = SparkSession
  .builder()
  .master("local[*]")
  .appName("Coupons_ViewingNow")
  .getOrCreate()

import spark.implicits._

val checkpoint_path = "/opt/checkpoints/"

val ks = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "test")
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", "false")
  .option("kafka.replica.fetch.max.bytes", "16777216")
  .load()

val dfDeviceid = ks
  .withColumn("val", ($"value").cast("string"))
  .withColumn("count1", get_json_object(($"val"), "$.a"))
  .withColumn("deviceId", get_json_object(($"val"), "$.b"))
  .withColumn("timestamp", current_timestamp())


val final_ids = dfDeviceid
  .withColumn("processing_time", current_timestamp())
  .withWatermark("processing_time","1 minutes")
  .groupBy(window($"processing_time", "10 seconds"), $"deviceId")
  .agg(sum($"count1") as "total")

val t = final_ids
  .select(to_json(struct($"*")) as "value")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "sub_topic")
  .option("checkpointLocation", checkpoint_path)
  .outputMode("append")
  .trigger(ProcessingTime("1 seconds"))
  .start()

t.awaitTermination()

  }

}


Thanks


Re: Stateful Structured Spark Streaming: Timeout is not getting triggered

2020-03-05 Thread Something Something
Yes that was it! It seems it only works if input data is continuously
flowing. I had stopped the input job because I had enough data but it seems
timeouts work only if the data is continuously fed. Not sure why it's
designed that way. Makes it a bit harder to write unit/integration tests
BUT I am sure there's a reason why it's designed this way. Thanks.

On Wed, Mar 4, 2020 at 6:31 PM Tathagata Das 
wrote:

> Make sure that you are continuously feeding data into the query to trigger
> the batches. only then timeouts are processed.
> See the timeout behavior details here -
> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.GroupState
>
> On Wed, Mar 4, 2020 at 2:51 PM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> I've set the timeout duration to "2 minutes" as follows:
>>
>> def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: 
>> Iterator[R00tJsonObject],
>>   oldState: GroupState[MyState]): OutputRow = {
>>
>> println(" Inside updateAcrossEvents with : " + tuple3._1 + ", " + 
>> tuple3._2 + ", " + tuple3._3)
>> var state: MyState = if (oldState.exists) oldState.get else 
>> MyState(tuple3._1, tuple3._2, tuple3._3)
>>
>> if (oldState.hasTimedOut) {
>>   println("@ oldState has timed out ")
>>   // Logic to Write OutputRow
>>   OutputRow("some values here...")
>> } else {
>>   for (input <- inputs) {
>> state = updateWithEvent(state, input)
>> oldState.update(state)
>> *oldState.setTimeoutDuration("2 minutes")*
>>   }
>>   OutputRow(null, null, null)
>> }
>>
>>   }
>>
>> I have also specified ProcessingTimeTimeout in 'mapGroupsWithState' as 
>> follows...
>>
>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(updateAcrossEvents)
>>
>> But 'hasTimedOut' is never true so I don't get any output! What am I doing 
>> wrong?
>>
>>
>>
>>


Re: Stateful Structured Spark Streaming: Timeout is not getting triggered

2020-03-04 Thread Tathagata Das
Make sure that you are continuously feeding data into the query to trigger
the batches. only then timeouts are processed.
See the timeout behavior details here -
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.GroupState

On Wed, Mar 4, 2020 at 2:51 PM Something Something 
wrote:

> I've set the timeout duration to "2 minutes" as follows:
>
> def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: 
> Iterator[R00tJsonObject],
>   oldState: GroupState[MyState]): OutputRow = {
>
> println(" Inside updateAcrossEvents with : " + tuple3._1 + ", " + 
> tuple3._2 + ", " + tuple3._3)
> var state: MyState = if (oldState.exists) oldState.get else 
> MyState(tuple3._1, tuple3._2, tuple3._3)
>
> if (oldState.hasTimedOut) {
>   println("@ oldState has timed out ")
>   // Logic to Write OutputRow
>   OutputRow("some values here...")
> } else {
>   for (input <- inputs) {
> state = updateWithEvent(state, input)
> oldState.update(state)
> *oldState.setTimeoutDuration("2 minutes")*
>   }
>   OutputRow(null, null, null)
> }
>
>   }
>
> I have also specified ProcessingTimeTimeout in 'mapGroupsWithState' as 
> follows...
>
> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(updateAcrossEvents)
>
> But 'hasTimedOut' is never true so I don't get any output! What am I doing 
> wrong?
>
>
>
>


Stateful Structured Spark Streaming: Timeout is not getting triggered

2020-03-04 Thread Something Something
I've set the timeout duration to "2 minutes" as follows:

def updateAcrossEvents (tuple3: Tuple3[String, String, String],
inputs: Iterator[R00tJsonObject],
  oldState: GroupState[MyState]): OutputRow = {

println(" Inside updateAcrossEvents with : " + tuple3._1 + ",
" + tuple3._2 + ", " + tuple3._3)
var state: MyState = if (oldState.exists) oldState.get else
MyState(tuple3._1, tuple3._2, tuple3._3)

if (oldState.hasTimedOut) {
  println("@ oldState has timed out ")
  // Logic to Write OutputRow
  OutputRow("some values here...")
} else {
  for (input <- inputs) {
state = updateWithEvent(state, input)
oldState.update(state)
*oldState.setTimeoutDuration("2 minutes")*
  }
  OutputRow(null, null, null)
}

  }

I have also specified ProcessingTimeTimeout in 'mapGroupsWithState' as
follows...

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(updateAcrossEvents)

But 'hasTimedOut' is never true so I don't get any output! What am I
doing wrong?


Re: Stateful Spark Streaming: Required attribute 'value' not found

2020-03-04 Thread Something Something
By simply adding 'toJSON' before 'writeStream' the problem was fixed. Maybe
it will help someone.

On Tue, Mar 3, 2020 at 6:02 PM Something Something 
wrote:

> In a Stateful Spark Streaming application I am writing the 'OutputRow' in
> the 'updateAcrossEvents' but I keep getting this error (*Required
> attribute 'value' not found*) while it's trying to write to Kafka. I know
> from the documentation that 'value' attribute needs to be set but how do I
> do that in the 'Stateful Structured Streaming'? Where & how do I add this
> 'value' attribute in the following code? *Note: I am using Spark 2.3.1*
>
> withEventTime
>   .as[R00tJsonObject]
>   .withWatermark("event_time", "5 minutes")
>   .groupByKey(row => (row.value.Id, row.value.time.toString, 
> row.value.cId))
>   
> .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout)(updateAcrossEvents)
>   .writeStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "localhost:9092")
>   .option("topic", "myTopic")
>   .option("checkpointLocation", "/Users/username/checkpointLocation")
>   .outputMode("update")
>   .start()
>   .awaitTermination()
>
>


Stateful Spark Streaming: Required attribute 'value' not found

2020-03-03 Thread Something Something
In a Stateful Spark Streaming application I am writing the 'OutputRow' in
the 'updateAcrossEvents' but I keep getting this error (*Required attribute
'value' not found*) while it's trying to write to Kafka. I know from the
documentation that 'value' attribute needs to be set but how do I do that
in the 'Stateful Structured Streaming'? Where & how do I add this 'value'
attribute in the following code? *Note: I am using Spark 2.3.1*

withEventTime
  .as[R00tJsonObject]
  .withWatermark("event_time", "5 minutes")
  .groupByKey(row => (row.value.Id, row.value.time.toString, row.value.cId))
  
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout)(updateAcrossEvents)
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "myTopic")
  .option("checkpointLocation", "/Users/username/checkpointLocation")
  .outputMode("update")
  .start()
  .awaitTermination()


Re: Spark Streaming with mapGroupsWithState

2020-03-02 Thread Something Something
I changed it to Tuple2 and that problem is solved.

Any thoughts on this message

*Unapplied methods are only converted to functions when a function type is
expected.*

*You can make this conversion explicit by writing `updateAcrossEvents _` or
`updateAcrossEvents(_,_,_,_,_)` instead of `updateAcrossEvents`.
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)*

On Mon, Mar 2, 2020 at 5:12 PM lec ssmi  wrote:

> maybe you can combine the fields you want to use into one field
>
> Something Something  于2020年3月3日周二 上午6:37写道:
>
>> I am writing a Stateful Streaming application in which I am using
>> mapGroupsWithState to create aggregates for Groups but I need to create 
>> *Groups
>> based on more than one column in the Input Row*. All the examples in the
>> 'Spark: The Definitive Guide' use only one column such as 'User' or
>> 'Device'. I am using code similar to what's given below. *How do I
>> specify more than one field in the 'groupByKey'?*
>>
>> There are other challenges as well. The book says we can use
>> 'updateAcrossEvents' the way given below but I get compile time error
>> saying:
>>
>>
>> *Error:(43, 65) missing argument list for method updateAcrossEvents in
>> object MainUnapplied methods are only converted to functions when a
>> function type is expected.You can make this conversion explicit by writing
>> `updateAcrossEvents _` or `updateAcrossEvents(_,_,_,_,_)` instead of
>> `updateAcrossEvents`.
>> .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)*
>>
>> Another challenge: Compiler also complains about the my *MyReport*: 
>> *Error:(41,
>> 12) Unable to find encoder for type stored in a Dataset.  Primitive types
>> (Int, String, etc) and Product types (case classes) are supported by
>> importing spark.implicits._  Support for serializing other types will be
>> added in future releases.*
>>
>> Help in resolving these errors would be greatly appreciated. Thanks in
>> advance.
>>
>>
>> withEventTime
>> .as[MyReport]
>>   .groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2?
>>   
>> .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
>>   .writeStream
>>   .queryName("test_query")
>>   .format("memory")
>>   .outputMode("update")
>>   .start()
>>
>>


Re: Spark Streaming with mapGroupsWithState

2020-03-02 Thread lec ssmi
maybe you can combine the fields you want to use into one field

Something Something  于2020年3月3日周二 上午6:37写道:

> I am writing a Stateful Streaming application in which I am using
> mapGroupsWithState to create aggregates for Groups but I need to create 
> *Groups
> based on more than one column in the Input Row*. All the examples in the
> 'Spark: The Definitive Guide' use only one column such as 'User' or
> 'Device'. I am using code similar to what's given below. *How do I
> specify more than one field in the 'groupByKey'?*
>
> There are other challenges as well. The book says we can use
> 'updateAcrossEvents' the way given below but I get compile time error
> saying:
>
>
> *Error:(43, 65) missing argument list for method updateAcrossEvents in
> object MainUnapplied methods are only converted to functions when a
> function type is expected.You can make this conversion explicit by writing
> `updateAcrossEvents _` or `updateAcrossEvents(_,_,_,_,_)` instead of
> `updateAcrossEvents`.
> .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)*
>
> Another challenge: Compiler also complains about the my *MyReport*: 
> *Error:(41,
> 12) Unable to find encoder for type stored in a Dataset.  Primitive types
> (Int, String, etc) and Product types (case classes) are supported by
> importing spark.implicits._  Support for serializing other types will be
> added in future releases.*
>
> Help in resolving these errors would be greatly appreciated. Thanks in
> advance.
>
>
> withEventTime
> .as[MyReport]
>   .groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2?
>   
> .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
>   .writeStream
>   .queryName("test_query")
>   .format("memory")
>   .outputMode("update")
>   .start()
>
>


Spark Streaming with mapGroupsWithState

2020-03-02 Thread Something Something
I am writing a Stateful Streaming application in which I am using
mapGroupsWithState to create aggregates for Groups but I need to create *Groups
based on more than one column in the Input Row*. All the examples in the
'Spark: The Definitive Guide' use only one column such as 'User' or
'Device'. I am using code similar to what's given below. *How do I specify
more than one field in the 'groupByKey'?*

There are other challenges as well. The book says we can use
'updateAcrossEvents' the way given below but I get compile time error
saying:


*Error:(43, 65) missing argument list for method updateAcrossEvents in
object MainUnapplied methods are only converted to functions when a
function type is expected.You can make this conversion explicit by writing
`updateAcrossEvents _` or `updateAcrossEvents(_,_,_,_,_)` instead of
`updateAcrossEvents`.
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)*

Another challenge: Compiler also complains about the my *MyReport*: *Error:(41,
12) Unable to find encoder for type stored in a Dataset.  Primitive types
(Int, String, etc) and Product types (case classes) are supported by
importing spark.implicits._  Support for serializing other types will be
added in future releases.*

Help in resolving these errors would be greatly appreciated. Thanks in
advance.


withEventTime
.as[MyReport]
  .groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2?
  .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
  .writeStream
  .queryName("test_query")
  .format("memory")
  .outputMode("update")
  .start()


Aggregating values by a key field in Spark Streaming

2020-02-28 Thread Something Something
Here's my use case: Messages are coming into a Kafka Topic for different
'Events'. Each event has a unique Event Id. I need to aggregate counts for
each Event AFTER the event is completed. For now, we are thinking we can
assume an event is completed if there are no more messages coming in for a
period of X minutes. How do I do this using Spark Streaming? I am reading
up on 'Stateful Transformations' with 'mapWithState'. Is that the right
approach? Any sample code would be even more appreciated. Thanks.


Re: Spark Streaming: Aggregating values across batches

2020-02-27 Thread Tathagata Das
Use Structured Streaming. Its aggregation, by definition, is across batches.

On Thu, Feb 27, 2020 at 3:17 PM Something Something <
mailinglist...@gmail.com> wrote:

> We've a Spark Streaming job that calculates some values in each batch.
> What we need to do now is aggregate values across ALL batches. What is the
> best strategy to do this in Spark Streaming. Should we use 'Spark
> Accumulators' for this?
>


Spark Streaming: Aggregating values across batches

2020-02-27 Thread Something Something
We've a Spark Streaming job that calculates some values in each batch. What
we need to do now is aggregate values across ALL batches. What is the best
strategy to do this in Spark Streaming. Should we use 'Spark Accumulators'
for this?


Spark Streaming job having issue with Java Flight Recorder (JFR)

2020-02-20 Thread Pramod Biligiri
Hi,
Has anyone successfully used Java Flight Recorder (JFR) with Spark
Streaming on Oracle Java 8? JFR works for me on batch jobs but not with
Streaming.

I'm running my streaming job on Amazon EMR. I have enabled Java Flight
Recorder (JFR) to profile CPU usage. But at the end of the job, the JFR
output files get deleted and are not available. This is the error in the
JFR trace logs:
[jfr][DEBUG][880.590] Release chunk
/mnt/tmp/2020_02_19_17_26_04_8823/2020_02_19_17_40_45_8823_0.jfr.part ref
count now 0
[jfr][DEBUG][880.590] Repository chunk
/mnt/tmp/2020_02_19_17_26_04_8823/2020_02_19_17_40_45_8823_0.jfr.part
deleted.
[jfr][DEBUG][880.590] Repository chunk
/mnt/tmp/2020_02_19_17_26_04_8823/2020_02_19_17_40_45_8823_0.jfr deleted.
[jfr][INFO ][880.593] Stopped recording [id=1,
name=dump-on-exit-clone-of-0, start=Wed Feb 19 17:26:06 UTC 2020, end=Wed
Feb 19 17:40:45 UTC 2020]
[jfr][ERROR][880.593] Could not copy to
/tmp/hotspot-pid-8823-id-0-2020_02_19_17_40_45.jfr
[jfr][ERROR][880.593] java.io.FileNotFoundException:No chunks
[jfr][ERROR][880.593]
 oracle.jrockit.jfr.ChunksChannel.(ChunksChannel.java:26)
[jfr][ERROR][880.593]
 oracle.jrockit.jfr.Recording.copyTo(Recording.java:404)
[jfr][ERROR][880.593]
 oracle.jrockit.jfr.JFRImpl.dumpOnExit(JFRImpl.java:802)
[jfr][ERROR][880.593]
 oracle.jrockit.jfr.JFRImpl.dumpOnExit(JFRImpl.java:789)
[jfr][ERROR][880.593]
 oracle.jrockit.jfr.JFRImpl.destroy(JFRImpl.java:767)
[jfr][ERROR][880.593]oracle.jrockit.jfr.JFR$2.run(JFR.java:208)
[jfr][ERROR][880.593]java.lang.Thread.run(Thread.java:748)
[jfr][ERROR][880.593] Could not delete
/tmp/hotspot-pid-8823-id-0-2020_02_19_17_40_45.jfr
[jfr][ERROR][880.593] java.io.FileNotFoundException:No chunks
[jfr][ERROR][880.593]oracle.jrockit.jfr
The rest of the trace can be found in the attachment (stdout.txt).
Also, I've not changed the default value of "
spark.streaming.stopGracefullyOnShutdown" (its default value is false)

The Spark driver and executor JVMs are launched with the following options:
-XX:+UnlockCommercialFeatures -XX:+FlightRecorder
-XX:FlightRecorderOptions=defaultrecording=true,disk=true,repository=/tmp,maxage=2h,dumponexit=true,dumponexitpath=/tmp,loglevel=trace

Pramod
[jfr][INFO ][0.000] JFR log level set. Log level now at [TRACE]
[jfr][TRACE][0.001] Jfr::initialize_stage_one completed
[jfr][TRACE][0.093] Loaded JFR library
[jfr][INFO ][0.125] Using /mnt/tmp/2020_02_19_17_26_04_8823 as Flight Recorder 
repository.
[jfr][TRACE][0.157] Default settings loaded.
[jfr][TRACE][0.164] VMJFR created.
[jfr][TRACE][0.239] classFileLoadHook called for java/lang/Throwable
[jfr][TRACE][0.239] instrumenting java.lang.Throwable
[jfr][TRACE][0.245] classFileLoadHook called for java/lang/Error
[jfr][TRACE][0.245] instrumenting java.lang.Error
[jfr][TRACE][0.247] RetransformClasses successful
[jfr][TRACE][0.270] Processing instrumentation class: class 
oracle.jrockit.jfr.FileInputStreamInstrumentor
[jfr][TRACE][0.275] Inliner processing method read()I
[jfr][TRACE][0.275] MethodCallInliner: 
targetMethod=oracle.jrockit.jfr.FileInputStreamInstrumentor.read()I
[jfr][TRACE][0.275] Inlining call to read()I
[jfr][TRACE][0.275] Inlining done
[jfr][TRACE][0.275] Inlining call to read()I
[jfr][TRACE][0.275] Inlining done
[jfr][TRACE][0.275] Inliner processing method read([B)I
[jfr][TRACE][0.275] MethodCallInliner: 
targetMethod=oracle.jrockit.jfr.FileInputStreamInstrumentor.read([B)I
[jfr][TRACE][0.276] Inlining call to read([B)I
[jfr][TRACE][0.276] Inlining done
[jfr][TRACE][0.276] Inlining call to read([B)I
[jfr][TRACE][0.276] Inlining done
[jfr][TRACE][0.276] Inliner processing method read([BII)I
[jfr][TRACE][0.276] MethodCallInliner: 
targetMethod=oracle.jrockit.jfr.FileInputStreamInstrumentor.read([BII)I
[jfr][TRACE][0.276] Inlining call to read([BII)I
[jfr][TRACE][0.276] Inlining done
[jfr][TRACE][0.276] Inlining call to read([BII)I
[jfr][TRACE][0.276] Inlining done
[jfr][TRACE][0.279] Deleting read([B)I
[jfr][TRACE][0.279] Deleting read()I
[jfr][TRACE][0.279] Deleting read([BII)I
[jfr][TRACE][0.280] Copying method: read()I
[jfr][TRACE][0.280]with mapper: 
{oracle/jrockit/jfr/FileInputStreamInstrumentor=java/io/FileInputStream}
[jfr][TRACE][0.282] Copying method: read([B)I
[jfr][TRACE][0.282]with mapper: 
{oracle/jrockit/jfr/FileInputStreamInstrumentor=java/io/FileInputStream}
[jfr][TRACE][0.282] Copying method: read([BII)I
[jfr][TRACE][0.282]with mapper: 
{oracle/jrockit/jfr/FileInputStreamInstrumentor=java/io/FileInputStream}
[jfr][TRACE][0.284] Processing instrumentation class: class 
oracle.jrockit.jfr.FileOutputStreamInstrumentor
[jfr][TRACE][0.285] Inliner processing method write(I)V
[jfr][TRACE][0.285] MethodCallInliner: 
targetMethod=oracle.jrockit.jfr.FileOutputStreamInstrumentor.write(I)V
[jfr][TRACE][0.285] Inlining call to write(I)V
[jfr][TRACE][0.285] Inlining done
[jfr][TRACE][0.285] Inlining call to write(I)V
[jfr][TRACE][0.285] Inlining done
[jfr][TRACE][0.286] Inliner process

[ spark-streaming ] - Data Locality issue

2020-02-04 Thread Karthik Srinivas
Hi,

I am using spark 2.3.2, i am facing issues due to data locality, even after
giving spark.locality.wait.rack=200, locality_level is always RACK_LOCAL,
can someone help me with this.

Thank you


Fwd: [Spark Streaming]: Why my Spark Direct stream is sending multiple offset commits to Kafka?

2020-01-06 Thread Raghu B
Hi Spark Community.

I need help with the following issue and I have been researching about it
from last 2 weeks and as a last and best resource I want to ask the Spark
community.

I am running the following code in Spark*

*  val sparkConf = new SparkConf()*

*.setMaster("local[*]")*

*.setAppName("KafkaTest")*

*.set("spark.streaming.kafka.maxRatePerPartition","10")*

*.set("spark.default.parallelism","10")*

*.set("spark.streaming.backpressure.enabled", "true")*

*.set("spark.scheduler.mode", "FAIR")*



*  lazy val sparkContext = new SparkContext(sparkConf)*

*  val sparkJob = new SparkLocal*



*  val kafkaParams = Map[String, Object](*

*  "bootstrap.servers" -> "kafka-270894369.spark.google.com:9092
",*

*  "key.deserializer" -> classOf[StringDeserializer],*

*  "value.deserializer" -> classOf[StringDeserializer],*

*  "group.id " -> "stream_group1",*

*  "auto.offset.reset" -> "latest",*

*  "enable.auto.commit" -> "false",*

*  "heartbeat.interval.ms " ->
"13", //3000*

*  "request.timeout.ms " -> "15",
//4*

*  "session.timeout.ms " -> "14",
//3*

*  "max.poll.interval.ms " ->
"14", //isn't a known config*

*  "max.poll.records" -> "100" //2147483647*

*)*



*val streamingContext = new StreamingContext(sparkContext,
Seconds(120))*



*val topics = Array("topicname")*



*val kafkaStream = KafkaUtils.createDirectStream[String, String](*

*  streamingContext,*

*  PreferConsistent,*

*  Subscribe[String, String](topics, kafkaParams)*

*)*



*def messageTuple(tuple: ConsumerRecord[String, String]): (String)
= {*

*  (null) // Removed the code*

*}*



*var offset : Array[OffsetRange] = null*



*kafkaStream.foreachRDD{rdd =>*

*  val offsetRanges =
rdd.asInstanceOf[HasOffsetRanges].offsetRanges*

*  offset = offsetRanges*



*  rdd.map(row => messageTuple(row))*

*.foreachPartition { partition =>*

*  partition.map(row => null)*

*.foreach{ record =>*

*  print("")*

*  Thread.sleep(5)*

*}*

*  }*

*
kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)*

*  }*



*streamingContext.start()*

*streamingContext.awaitTerminationOrTimeout(600)*



*sys.ShutdownHookThread{*

*  println("Gracefully shutting down App")*

*  streamingContext.stop(true,true)*

*  println("Application stopped")*

*}*




With the above code I am observing multiple commits are sending to Kafka
and I am not sure why ?

(Got the below info from kafka __consumer_offset topic)













*  [stream_group1,topicname,59]::OffsetAndMetadata(offset=864006531,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=157773011,
expireTimestamp=Some(1577816400011))
[stream_group1,topicname,59]::OffsetAndMetadata(offset=864006531,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=157773012,
expireTimestamp=Some(1577816400012))
[stream_group1,topicname,59]::OffsetAndMetadata(offset=864005827,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=157773079,
expireTimestamp=Some(1577816400079))
[stream_group1,topicname,59]::OffsetAndMetadata(offset=864008524,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730120008,
expireTimestamp=Some(1577816520008))
[stream_group1,topicname,59]::OffsetAndMetadata(offset=864008524,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730120010,
expireTimestamp=Some(1577816520010))
[stream_group1,topicname,59]::OffsetAndMetadata(offset=864008524,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730120077,
expireTimestamp=Some(1577816520077))
[stream_group1,topicname,59]::OffsetAndMetadata(offset=864008959,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730240010,
expireTimestamp=Some(1577816640010))
[stream_group1,topicname,59]::OffsetAndMetadata(offset=864008959,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730240015,
expireTimestamp=Some(1577816640015))
[stream_group1,topicname,59]::OffsetAndMetadata(offset=864008959,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730240137,
expireTimestamp=Some(1577816640137))*

*[stream_group1,topicname,59]::OffsetAndMetadata(offset=864006531,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=157773012,
expireTimestamp=Some(1577816400012))*
*[stream_group1,topicname,59]::OffsetAndMetadata(offset=864005827,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=157773079,
expireTimestamp=Some(1577816400079))*

*   

Spark streaming when a node or nodes go down

2019-12-11 Thread Mich Talebzadeh
Hi,

I know this is a basic question but someone enquired about it and I just
wanted to fill my knowledge gap so to speak.

Within the context of Spark streaming, the RDD is created from the incoming
topic and RDD is partitioned and each node of Spark is operating on a
partition at that time. OK This series of operations are merged together
and create a DAG. That means DAG keeps track of operations performed?

If a node goes down, the driver (application master) knows about it. Then,
it tries to assign another node to continue the processing at the same
place on that partition of RDD. This works but with reduced performance.
However, to be able to handle the lost partition,* the data has to be
available to all nodes from the beginning.* So we are talking about spark
streaming not spark reading from HDFS, Hive table etc. The assumption is
that the streaming data is cached in every single node? Is that correct!

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: spark streaming exception

2019-11-10 Thread Akshay Bhardwaj
Hi,

Could you provide with the code snippet of how you are connecting and
reading data from kafka?

Akshay Bhardwaj
+91-97111-33849


On Thu, Oct 17, 2019 at 8:39 PM Amit Sharma  wrote:

> Please update me if any one knows about it.
>
>
> Thanks
> Amit
>
> On Thu, Oct 10, 2019 at 3:49 PM Amit Sharma  wrote:
>
>> Hi , we have spark streaming job to which we send a request through our
>> UI using kafka. It process and returned the response. We are getting below
>> error and this stareming is not processing any request.
>>
>> Listener StreamingJobProgressListener threw an exception
>> java.util.NoSuchElementException: key not found: 1570689515000 ms
>> at scala.collection.MapLike$class.default(MapLike.scala:228)
>> at scala.collection.AbstractMap.default(Map.scala:59)
>> at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
>> at
>> org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:134)
>> at
>> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67)
>> at
>> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29).
>>
>> Please help me in find out the root cause of this issue.
>>
>


[Spark Streaming] Apply multiple ML pipelines(Models) to the same stream

2019-10-31 Thread Spico Florin
Hello!

I have an use case where I have to apply multiple already trained models
(e.g. M1, M2, ..Mn) on the same spark stream ( fetched from kafka).

The models were trained usining the isolation forest algorithm from here:
https://github.com/titicaca/spark-iforest


I have found something similar with my case here
https://www.youtube.com/watch?v=EhRHQPCdldI, but unfortunately I don't know
if the company Genesys (former AltoCloud) made this API  (StreamPipeline,
Heterogenous Pipeline ) open source.

I handled this with the above schema code, but I don't know how optimal is.

//read the stream
val kafkaStreamDF = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", broker)
  .option("subscribe", "topic")
  .load
val myModels = Array("m1", "m2","m3","m4")
//parallize the input models in order to have multiple threads handling the
same stream, otherwise blocked??
 myModels.par.foreach(lm => {

 //load the model
 val model = PipelineModel.load(lm)

  kafkaStreamDF.writeStream.foreachBatch({ (batchDF: DataFrame,
batchId: Long) =>
//apply model
val pdf =
model.transform(batchDF).selectExpr("CAST(to_json(struct(*)) AS STRING) AS
value").write
  .format("json")
  .save("anom/" + lm +  System.currentTimeMillis())
  }).start().awaitTermination()
})

Questions:
1. Therefore, I would like to know if there is any any Spark API for
handling such an use case?

2. If yes, where can I find it?

3. If no, how can I optimally implement this?

Any idea, suggestions is highly appreciated.

Thanks.
 Florin


Re: spark streaming exception

2019-10-17 Thread Amit Sharma
Please update me if any one knows about it.


Thanks
Amit

On Thu, Oct 10, 2019 at 3:49 PM Amit Sharma  wrote:

> Hi , we have spark streaming job to which we send a request through our UI
> using kafka. It process and returned the response. We are getting below
> error and this stareming is not processing any request.
>
> Listener StreamingJobProgressListener threw an exception
> java.util.NoSuchElementException: key not found: 1570689515000 ms
> at scala.collection.MapLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:59)
> at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
> at
> org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:134)
> at
> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67)
> at
> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29).
>
> Please help me in find out the root cause of this issue.
>


Semantics of Manual Offset Commit for Kafka Spark Streaming

2019-10-14 Thread Andre Piwoni
When using manual Kafka offset commit in Spark streaming  job and application 
fails to process current batch without committing offset in executor, is it 
expected behavior that next batch will be processed and offset will be moved to 
next batch regardless of application failure to commit? It seems so based on 
glance at the code. If so, is it expected that job termination upon failure to 
process batch and commit offset should resume from last committed offset?

I’m asking since until now I didn’t have to deal with Spark streaming from 
Kafka where assumption was “successfully processed  at-least-once”. Stopping 
Kafka processing or streaming on any application failure may seem rather 
extreme but it is what it is.

Thank you,
Andre


spark streaming exception

2019-10-10 Thread Amit Sharma
Hi , we have spark streaming job to which we send a request through our UI
using kafka. It process and returned the response. We are getting below
error and this stareming is not processing any request.

Listener StreamingJobProgressListener threw an exception
java.util.NoSuchElementException: key not found: 1570689515000 ms
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
at
org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:134)
at
org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67)
at
org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29).

Please help me in find out the root cause of this issue.


<    1   2   3   4   5   6   7   8   9   10   >