Re: spark streaming exception

2019-11-10 Thread Akshay Bhardwaj
Hi,

Could you provide with the code snippet of how you are connecting and
reading data from kafka?

Akshay Bhardwaj
+91-97111-33849


On Thu, Oct 17, 2019 at 8:39 PM Amit Sharma  wrote:

> Please update me if any one knows about it.
>
>
> Thanks
> Amit
>
> On Thu, Oct 10, 2019 at 3:49 PM Amit Sharma  wrote:
>
>> Hi , we have spark streaming job to which we send a request through our
>> UI using kafka. It process and returned the response. We are getting below
>> error and this stareming is not processing any request.
>>
>> Listener StreamingJobProgressListener threw an exception
>> java.util.NoSuchElementException: key not found: 1570689515000 ms
>> at scala.collection.MapLike$class.default(MapLike.scala:228)
>> at scala.collection.AbstractMap.default(Map.scala:59)
>> at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
>> at
>> org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:134)
>> at
>> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67)
>> at
>> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29).
>>
>> Please help me in find out the root cause of this issue.
>>
>


Re: driver crashesneed to find out why driver keeps crashing

2019-10-23 Thread Akshay Bhardwaj
Hi,

Were you able to check the executors logs for this? If executors are
running in a separate JVMs/machines, they will have separate log files from
driver. If the OOME is due to concatenation of the large string, it may be
reported in the executors logs first.

How are you running this spark job? Standalone spark process(master is set
to local[*]) ?
Spark master-slave cluster?
YARN or Mesos Cluster, etc?


Akshay Bhardwaj
+91-97111-33849


On Mon, Oct 21, 2019 at 11:20 AM Manuel Sopena Ballesteros <
manuel...@garvan.org.au> wrote:

> Dear Apache Spark community,
>
>
>
> My spark driver crashes and logs does not gives enough explanation of why
> it happens:
>
>
>
> INFO [2019-10-21 16:33:37,045] ({pool-6-thread-7}
> SchedulerFactory.java[jobStarted]:109) - Job 20190926-163704_913596201
> started by scheduler interpreter_2100843352
>
> DEBUG [2019-10-21 16:33:37,046] ({pool-6-thread-7}
> RemoteInterpreterServer.java[jobRun]:632) - Script after hooks: a =
> "bigword"
>
> b = "bigword"
>
> print(a)
>
>
>
> for i in range(1000):
>
> a += b
>
>
>
> print(a)
>
> __zeppelin__._displayhook()
>
> DEBUG [2019-10-21 16:33:37,046] ({pool-6-thread-7}
> RemoteInterpreterEventClient.java[sendEvent]:413) - Send Event:
> RemoteInterpreterEvent(type:META_INFOS, data:{"message":"Spark UI
> enabled","url":"http://r640-1-10-mlx.mlx:38863"})
>
> DEBUG [2019-10-21 16:33:37,048] ({pool-5-thread-1}
> RemoteInterpreterEventClient.java[pollEvent]:366) - Send event META_INFOS
>
> DEBUG [2019-10-21 16:33:37,054] ({Thread-34}
> RemoteInterpreterServer.java[onUpdate]:799) - Output Update:
>
> DEBUG [2019-10-21 16:33:37,054] ({Thread-34}
> RemoteInterpreterEventClient.java[sendEvent]:413) - Send Event:
> RemoteInterpreterEvent(type:OUTPUT_UPDATE,
> data:{"data":"","index":"0","noteId":"2ENM9X82N","paragraphId":"20190926-163704_913596201","type":"TEXT"})
>
> DEBUG [2019-10-21 16:33:37,054] ({Thread-34}
> RemoteInterpreterServer.java[onAppend]:789) - Output Append: bigword
>
>
>
> DEBUG [2019-10-21 16:33:37,054] ({pool-5-thread-1}
> RemoteInterpreterEventClient.java[pollEvent]:366) - Send event OUTPUT_UPDATE
>
> DEBUG [2019-10-21 16:33:37,054] ({Thread-34}
> RemoteInterpreterEventClient.java[sendEvent]:413) - Send Event:
> RemoteInterpreterEvent(type:OUTPUT_APPEND,
> data:{"data":"bigword\n","index":"0","noteId":"2ENM9X82N","paragraphId":"20190926-163704_913596201"})
>
> DEBUG [2019-10-21 16:33:37,062] ({pool-5-thread-1}
> RemoteInterpreterEventClient.java[pollEvent]:366) - Send event OUTPUT_APPEND
>
> DEBUG [2019-10-21 16:33:37,145] ({pool-5-thread-3}
> Interpreter.java[getProperty]:222) - key: zeppelin.spark.concurrentSQL,
> value: false
>
> DEBUG [2019-10-21 16:33:37,145] ({pool-5-thread-3}
> Interpreter.java[getProperty]:222) - key: zeppelin.spark.concurrentSQL,
> value: false
>
> DEBUG [2019-10-21 16:33:37,225] ({pool-5-thread-3}
> RemoteInterpreterServer.java[resourcePoolGetAll]:1089) - Request getAll
> from ZeppelinServer
>
> ERROR [2019-10-21 16:33:40,357] ({SIGTERM handler}
> SignalUtils.scala[apply$mcZ$sp]:43) - RECEIVED SIGNAL TERM
>
> INFO [2019-10-21 16:33:40,530] ({shutdown-hook-0}
> Logging.scala[logInfo]:54) - Invoking stop() from shutdown hook
>
>
>
> I assume it is because the jvm runs out of memory but I would expect an
> error saying so.
>
>
>
> Any idea?
>
>
>
> Thank you very much
> NOTICE
> Please consider the environment before printing this email. This message
> and any attachments are intended for the addressee named and may contain
> legally privileged/confidential/copyright information. If you are not the
> intended recipient, you should not read, use, disclose, copy or distribute
> this communication. If you have received this message in error please
> notify us at once by return email and then delete both messages. We accept
> no liability for the distribution of viruses or similar in electronic
> communications. This notice should not be removed.
>


Re: Why my spark job STATE--> Running FINALSTATE --> Undefined.

2019-06-11 Thread Akshay Bhardwaj
Hi Shyam,

It will be good if you mention what are you using the --master url as? Is
it running on YARN, Mesos or Spark cluster?

However, I faced such an issue in my earlier trials with spark, in which I
created connections with a lot of external databases like Cassandra within
the Driver (or main program of my app).
After the job completed, my Main program/driver task never finished, after
debugging I found out to be the reason as open sessions with Cassandra.
Closing out those connections at the end of my main program helped resolve
the problem. As you can guess, this issue was then irrespective of the
Cluster manager used.


Akshay Bhardwaj
+91-97111-33849


On Tue, Jun 11, 2019 at 7:41 PM Shyam P  wrote:

> Hi,
> Any clue why spark job goes into UNDEFINED state ?
>
> More detail are in the url.
>
> https://stackoverflow.com/questions/56545644/why-my-spark-sql-job-stays-in-state-runningfinalstatus-undefined
>
>
> Appreciate your help.
>
> Regards,
> Shyam
>


Re: adding a column to a groupBy (dataframe)

2019-06-06 Thread Akshay Bhardwaj
Additionally there is "uuid" function available as well if that helps your
use case.


Akshay Bhardwaj
+91-97111-33849


On Thu, Jun 6, 2019 at 3:18 PM Akshay Bhardwaj <
akshay.bhardwaj1...@gmail.com> wrote:

> Hi Marcelo,
>
> If you are using spark 2.3+ and dataset API/SparkSQL,you can use this
> inbuilt function "monotonically_increasing_id" in Spark.
> A little tweaking using Spark sql inbuilt functions can enable you to
> achieve this without having to write code or define RDDs with map/reduce
> functions.
>
> Akshay Bhardwaj
> +91-97111-33849
>
>
> On Thu, May 30, 2019 at 4:05 AM Marcelo Valle 
> wrote:
>
>> Hi all,
>>
>> I am new to spark and I am trying to write an application using
>> dataframes that normalize data.
>>
>> So I have a dataframe `denormalized_cities` with 3 columns:  COUNTRY,
>> CITY, CITY_NICKNAME
>>
>> Here is what I want to do:
>>
>>
>>1. Map by country, then for each country generate a new ID and write
>>to a new dataframe `countries`, which would have COUNTRY_ID, COUNTRY -
>>country ID would be generated, probably using 
>> `monotonically_increasing_id`.
>>2. For each country, write several lines on a new dataframe `cities`,
>>which would have COUNTRY_ID, ID, CITY, CITY_NICKNAME. COUNTRY_ID would be
>>the same generated on country table and ID would be another ID I generate.
>>
>> What's the best way to do this, hopefully using only dataframes (no low
>> level RDDs) unless it's not possible?
>>
>> I clearly see a MAP/Reduce process where for each KEY mapped I generate a
>> row in countries table with COUNTRY_ID and for every value I write a row in
>> cities table. But how to implement this in an easy and efficient way?
>>
>> I thought about using a `GroupBy Country` and then using `collect` to
>> collect all values for that country, but then I don't know how to generate
>> the country id and I am not sure about memory efficiency of `collect` for a
>> country with too many cities (bare in mind country/city is just an example,
>> my real entities are different).
>>
>> Could anyone point me to the direction of a good solution?
>>
>> Thanks,
>> Marcelo.
>>
>> This email is confidential [and may be protected by legal privilege]. If
>> you are not the intended recipient, please do not copy or disclose its
>> content but contact the sender immediately upon receipt.
>>
>> KTech Services Ltd is registered in England as company number 10704940.
>>
>> Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE,
>> United Kingdom
>>
>


Re: adding a column to a groupBy (dataframe)

2019-06-06 Thread Akshay Bhardwaj
Hi Marcelo,

If you are using spark 2.3+ and dataset API/SparkSQL,you can use this
inbuilt function "monotonically_increasing_id" in Spark.
A little tweaking using Spark sql inbuilt functions can enable you to
achieve this without having to write code or define RDDs with map/reduce
functions.

Akshay Bhardwaj
+91-97111-33849


On Thu, May 30, 2019 at 4:05 AM Marcelo Valle 
wrote:

> Hi all,
>
> I am new to spark and I am trying to write an application using dataframes
> that normalize data.
>
> So I have a dataframe `denormalized_cities` with 3 columns:  COUNTRY,
> CITY, CITY_NICKNAME
>
> Here is what I want to do:
>
>
>1. Map by country, then for each country generate a new ID and write
>to a new dataframe `countries`, which would have COUNTRY_ID, COUNTRY -
>country ID would be generated, probably using 
> `monotonically_increasing_id`.
>2. For each country, write several lines on a new dataframe `cities`,
>which would have COUNTRY_ID, ID, CITY, CITY_NICKNAME. COUNTRY_ID would be
>the same generated on country table and ID would be another ID I generate.
>
> What's the best way to do this, hopefully using only dataframes (no low
> level RDDs) unless it's not possible?
>
> I clearly see a MAP/Reduce process where for each KEY mapped I generate a
> row in countries table with COUNTRY_ID and for every value I write a row in
> cities table. But how to implement this in an easy and efficient way?
>
> I thought about using a `GroupBy Country` and then using `collect` to
> collect all values for that country, but then I don't know how to generate
> the country id and I am not sure about memory efficiency of `collect` for a
> country with too many cities (bare in mind country/city is just an example,
> my real entities are different).
>
> Could anyone point me to the direction of a good solution?
>
> Thanks,
> Marcelo.
>
> This email is confidential [and may be protected by legal privilege]. If
> you are not the intended recipient, please do not copy or disclose its
> content but contact the sender immediately upon receipt.
>
> KTech Services Ltd is registered in England as company number 10704940.
>
> Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE,
> United Kingdom
>


Re: Executors idle, driver heap exploding and maxing only 1 cpu core

2019-05-29 Thread Akshay Bhardwaj
Hi,

A few thoughts to add to Nicholas' apt reply.

We were loading multiple files from AWS S3 in our Spark application. When
the spark step of load files is called, the driver spends significant time
fetching the exact path of files from AWS s3.
Especially because we specified S3 paths like regex string (Eg:
s3a://bucket-name/folder1/data1/2019-05*/* , Defines that I want to
reference all sub-files/folders for the month of May 2019)

At that time how I was able to verify the same was by running "iftop" linux
command, and this showed a lot of network calls to *s3.amazon.com* servers

This phenomena occurs only when I define the load files transformation,
even when no save/collect action has been called in my spark pipeline.
Even on Spark UI it does not show that any stage is in running mode. And
only when all the network calls to AWS s3 are completed, the Spark UI shows
that the call to load files was completed in 2 seconds.
My spark job "seemed to be paused" for over half an hour depending upon the
number of files. I believe this happens due to the underlying library of
AWS SDK/Azure SDK that we use in Spark.
They need to fetch exact file paths in the object stores before they can be
referenced in spark.


As you mention you are using Azure blob files, this should explain the
behaviour where everything seems to stop. You can reduce this time by
ensuring you have small number of large files in your blob store to read
from instead of vice-a-versa.

Akshay Bhardwaj
+91-97111-33849


On Thu, May 23, 2019 at 11:13 PM Nicholas Hakobian <
nicholas.hakob...@rallyhealth.com> wrote:

> One potential case that can cause this is the optimizer being a little
> overzealous with determining if a table can be broadcasted or not. Have you
> checked the UI or query plan to see if any steps include a
> BroadcastHashJoin? Its possible that the optimizer thinks that it should be
> able to fit the table in memory from looking at its size on disk, but it
> actually cannot fit in memory. In this case you might want to look at
> tuning the autoBroadcastJoinThreshold.
>
> Another potential case is that at the step it looks like the driver is
> "hanging" its attempting to load in a data source that is backed by a very
> large number of files. Spark maintains a cache of file paths for a data
> source to determine task splits, and we've seen the driver appear to hang
> and/or crash if you try to load in thousands (or more) of tiny files per
> partition, and you have a large number of partitions.
>
> Hope this helps.
>
> Nicholas Szandor Hakobian, Ph.D.
> Principal Data Scientist
> Rally Health
> nicholas.hakob...@rallyhealth.com
>
>
> On Thu, May 23, 2019 at 7:36 AM Ashic Mahtab  wrote:
>
>> Hi,
>> We have a quite long winded Spark application we inherited with many
>> stages. When we run on our spark cluster, things start off well enough.
>> Workers are busy, lots of progress made, etc. etc. However, 30 minutes into
>> processing, we see CPU usage of the workers drop drastically. At this time,
>> we also see that the driver is maxing out exactly one core (though we've
>> given it more than one), and its ram usage is creeping up. At this time,
>> there's no logs coming out on the driver. Everything seems to stop, and
>> then it suddenly starts working, and the workers start working again. The
>> driver ram doesn't go down, but flatlines. A few minutes later, the same
>> thing happens again - the world seems to stop. However, the driver soon
>> crashes with an out of memory exception.
>>
>> What could be causing this sort of behaviour on the driver? We don't have
>> any collect() or similar functions in the code. We're reading in from Azure
>> blobs, processing, and writing back to Azure blobs. Where should we start
>> in trying to get to the bottom of this? We're running Spark 2.4.1 in a
>> stand-alone cluster.
>>
>> Thanks,
>> Ashic.
>>
>


Re: double quota is automaticly added when sinking as csv

2019-05-21 Thread Akshay Bhardwaj
Hi,

Add writeStream.option("quoteMode", "NONE")

By default Spark dataset api assumes that all the values MUST BE enclosed
in quote character (def: ") while writing to CSV files.

Akshay Bhardwaj
+91-97111-33849


On Tue, May 21, 2019 at 5:34 PM 杨浩  wrote:

> We use struct streaming 2.2, when sinking as csv, a json str will automatic
> add "" for it, like an element is
>>
>> {"hello": "world"}
>
> result data in fs will be
>
>> "{\"hello\": \"world\"}"
>
>
> How to avoid the "",we only want
>
>> {"hello": "world"}
>
> code like
>
>> resultDS.
>>   writeStream.
>>   outputMode(OutputMode.Append()).
>>   trigger(Trigger.ProcessingTime(TriggerInterval, TimeUnit.SECONDS)).
>>   format("csv").
>>   option("sep", "\t").
>>   option("path", DIR).
>>   option("checkpointLocation", CheckPointDir).
>>   option("compression", "gzip").
>>   partitionBy("event_day", "event_hour").
>>   start.
>>   awaitTermination()
>>
>
>


Re: Spark-YARN | Scheduling of containers

2019-05-20 Thread Akshay Bhardwaj
Hi Hari,

Thanks for this information.

Do you have any resources on/can explain, why YARN has this as default
behaviour? What would be the advantages/scenarios to have multiple
assignments in single heartbeat?


Regards
Akshay Bhardwaj
+91-97111-33849


On Mon, May 20, 2019 at 1:29 PM Hariharan  wrote:

> Hi Akshay,
>
> I believe HDP uses the capacity scheduler by default. In the capacity
> scheduler, assignment of multiple containers on the same node is
> determined by the option
> yarn.scheduler.capacity.per-node-heartbeat.multiple-assignments-enabled,
> which is true by default. If you would like YARN to spread out the
> containers, you can set this for false.
>
> You can read learn about this and associated parameters here
> -
> https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html
>
> ~ Hari
>
>
> On Mon, May 20, 2019 at 11:16 AM Akshay Bhardwaj
>  wrote:
> >
> > Hi All,
> >
> > Just floating this email again. Grateful for any suggestions.
> >
> > Akshay Bhardwaj
> > +91-97111-33849
> >
> >
> > On Mon, May 20, 2019 at 12:25 AM Akshay Bhardwaj <
> akshay.bhardwaj1...@gmail.com> wrote:
> >>
> >> Hi All,
> >>
> >> I am running Spark 2.3 on YARN using HDP 2.6
> >>
> >> I am running spark job using dynamic resource allocation on YARN with
> minimum 2 executors and maximum 6. My job read data from parquet files
> which are present on S3 buckets and store some enriched data to cassandra.
> >>
> >> My question is, how does YARN decide which nodes to launch containers?
> >> I have around 12 YARN nodes running in the cluster, but still i see
> repeated patterns of 3-4 containers launched on the same node for a
> particular job.
> >>
> >> What is the best way to start debugging this reason?
> >>
> >> Akshay Bhardwaj
> >> +91-97111-33849
>


Re: Spark-YARN | Scheduling of containers

2019-05-19 Thread Akshay Bhardwaj
Hi All,

Just floating this email again. Grateful for any suggestions.

Akshay Bhardwaj
+91-97111-33849


On Mon, May 20, 2019 at 12:25 AM Akshay Bhardwaj <
akshay.bhardwaj1...@gmail.com> wrote:

> Hi All,
>
> I am running Spark 2.3 on YARN using HDP 2.6
>
> I am running spark job using dynamic resource allocation on YARN with
> minimum 2 executors and maximum 6. My job read data from parquet files
> which are present on S3 buckets and store some enriched data to cassandra.
>
> My question is, how does YARN decide which nodes to launch containers?
> I have around 12 YARN nodes running in the cluster, but still i see
> repeated patterns of 3-4 containers launched on the same node for a
> particular job.
>
> What is the best way to start debugging this reason?
>
> Akshay Bhardwaj
> +91-97111-33849
>


Spark-YARN | Scheduling of containers

2019-05-19 Thread Akshay Bhardwaj
Hi All,

I am running Spark 2.3 on YARN using HDP 2.6

I am running spark job using dynamic resource allocation on YARN with
minimum 2 executors and maximum 6. My job read data from parquet files
which are present on S3 buckets and store some enriched data to cassandra.

My question is, how does YARN decide which nodes to launch containers?
I have around 12 YARN nodes running in the cluster, but still i see
repeated patterns of 3-4 containers launched on the same node for a
particular job.

What is the best way to start debugging this reason?

Akshay Bhardwaj
+91-97111-33849


Re: Spark job gets hung on cloudera cluster

2019-05-16 Thread Akshay Bhardwaj
One of the reason that any jobs running on YARN (Spark, MR, Hive, etc) can
get stuck is if there is data unavailability issue with HDFS.
This can arise if either the Namenode is not reachable or if the particular
data block is unavailable due to node failures.

Can you check if your YARN service can communicate with Name node service?

Akshay Bhardwaj
+91-97111-33849


On Thu, May 16, 2019 at 4:27 PM Rishi Shah  wrote:

> on yarn
>
> On Thu, May 16, 2019 at 1:36 AM Akshay Bhardwaj <
> akshay.bhardwaj1...@gmail.com> wrote:
>
>> Hi Rishi,
>>
>> Are you running spark on YARN or spark's master-slave cluster?
>>
>> Akshay Bhardwaj
>> +91-97111-33849
>>
>>
>> On Thu, May 16, 2019 at 7:15 AM Rishi Shah 
>> wrote:
>>
>>> Any one please?
>>>
>>> On Tue, May 14, 2019 at 11:51 PM Rishi Shah 
>>> wrote:
>>>
>>>> Hi All,
>>>>
>>>> At times when there's a data node failure, running spark job doesn't
>>>> fail - it gets stuck and doesn't return. Any setting can help here? I would
>>>> ideally like to get the job terminated or executors running on those data
>>>> nodes fail...
>>>>
>>>> --
>>>> Regards,
>>>>
>>>> Rishi Shah
>>>>
>>>
>>>
>>> --
>>> Regards,
>>>
>>> Rishi Shah
>>>
>>
>
> --
> Regards,
>
> Rishi Shah
>


Re: Running spark with javaagent configuration

2019-05-15 Thread Akshay Bhardwaj
Hi Anton,

Do you have the option of storing the JAR file on HDFS, which can be
accessed via spark in your cluster?

Akshay Bhardwaj
+91-97111-33849


On Thu, May 16, 2019 at 12:04 AM Oleg Mazurov 
wrote:

> You can see what Uber JVM does at
> https://github.com/uber-common/jvm-profiler :
>
> --conf spark.jars=hdfs://hdfs_url/lib/jvm-profiler-1.0.0.jar
>> --conf spark.executor.extraJavaOptions=-javaagent:jvm-profiler-1.0.0.jar
>
>
> -- Oleg
>
> On Wed, May 15, 2019 at 6:28 AM Anton Puzanov 
> wrote:
>
>> Hi everyone,
>>
>> I want to run my spark application with javaagent, specifically I want to
>> use newrelic with my application.
>>
>> When I run spark-submit I must pass --conf
>> "spark.driver.extraJavaOptions=-javaagent="
>>
>> My problem is that I can't specify the full path as I run in cluster mode
>> and I don't know the exact host which will serve as the driver.
>> *Important:* I know I can upload the jar to every node, but it seems
>> like a fragile solution as machines will be added and removed later.
>>
>> I have tried specifying the jar with --files but couldn't make it work,
>> as I didn't know where exactly I should point the javaagent
>>
>> Any suggestions on what is the best practice to handle this kind of
>> problems? and what can I do?
>>
>> Thanks a lot,
>> Anton
>>
>


Re: Spark job gets hung on cloudera cluster

2019-05-15 Thread Akshay Bhardwaj
Hi Rishi,

Are you running spark on YARN or spark's master-slave cluster?

Akshay Bhardwaj
+91-97111-33849


On Thu, May 16, 2019 at 7:15 AM Rishi Shah  wrote:

> Any one please?
>
> On Tue, May 14, 2019 at 11:51 PM Rishi Shah 
> wrote:
>
>> Hi All,
>>
>> At times when there's a data node failure, running spark job doesn't fail
>> - it gets stuck and doesn't return. Any setting can help here? I would
>> ideally like to get the job terminated or executors running on those data
>> nodes fail...
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>
>
> --
> Regards,
>
> Rishi Shah
>


Spark Elasticsearch Connector | Index and Update

2019-05-10 Thread Akshay Bhardwaj
Hi All,

Are there any users who have integrated spark structured streaming  with
elastic search 6.x?

In elastic search doc, ElasticSearch Hadoop Configuration
<https://www.elastic.co/guide/en/elasticsearch/hadoop/6.6/configuration.html>
There is a property es.write.option which is defined as follows

es.write.operation (default index)
The write operation elasticsearch-hadoop should perform - can be any of:


   1. index (default)
   - new data is added while existing data (based on its id) is replaced
 (reindexed).
  1. create
   - adds new data - if the data already exists (based on its id), an
 exception is thrown.
  1. update
   - updates existing data (based on its id). If no data is found, an
 exception is thrown.
  1. upsert
   - known as merge or insert if the data does not exist, updates if the
 data exists (based on its id).


*Say if I have 2 documents in the partition, and based on a field I want to
index the document, and based on another field I want to update the
document with an inline script.*

*Is there a possibility to do this in the same writeStream for Elastic
Search in spark structured streaming?*

Akshay Bhardwaj
+91-97111-33849


Re: Structured Streaming Kafka - Weird behavior with performance and logs

2019-05-08 Thread Akshay Bhardwaj
Hi Austin,

A few questions:

   1. What is the partition of the kafka topic that used for input and
   output data?
   2. In the write stream, I will recommend to use "trigger" with a defined
   interval, if you prefer micro-batching strategy,
   3. along with defining "maxOffsetsPerTrigger" in kafka readStream
   options, which lets you choose the amount of messages you want per trigger.
   (Helps in maintaining the expected threshold of executors/memory for the
   cluster)

For repeated log messages, notice in your logs the streaming query progress
published. This progress status displays a lot of metrics that shall be
your first diagnosis to identify issues.
The progress status with kafka stream displays the "startOffset" and
"endOffset" values per batch. This is listed topic-partition wise the start
to end offsets per trigger batch of streaming query.


Akshay Bhardwaj
+91-97111-33849


On Tue, May 7, 2019 at 8:02 PM Austin Weaver  wrote:

> Hey Spark Experts,
>
> After listening to some of you, and the presentations at Spark Summit in
> SF, I am transitioning from d-streams to structured streaming however I am
> seeing some weird results.
>
> My use case is as follows: I am reading in a stream from a kafka topic,
> transforming a message, and writing the transformed message to another
> kafka topic.
>
> While running my stream, I can see the transformed messages on the output
> topic so I know the basic structure of my stream seems to be running as
> intended.
>
> Inside my transformation, I am logging the total transform time as well as
> the raw message being transformed. (Java by the way)
>
> The 2 weird things I am seeing:
> 1) I am seeing that the consumer lag for this particular consumer group on
> the input topic is increasing. This does not make sense to me - looking at
> the transform time from the logs, it should easily be able to handle the
> incoming feed. To give an example the transform times are < 10 ms per
> record and the sample of data does not contain > 100 messages per second.
> The stream should be reducing consumer lag as it runs (especially
> considering multiple workers and partitions)
>
> 2) I am seeing the same log transformation messages over and over on the
> dataproc spark cluster logs. For example, I am currently looking at my logs
> and the last 20+ log messages are the exact same
>
> I thought 2 may be due to offsets not being handled correctly, but I am
> seeing a reasonable range of transformed messages on the target topic, and
> I'm using the built in checkpointing for spark to handle the offsets for me.
>
> In terms of 1, why would I be seeing the same log messages over and over?
> It doesnt make sense to me - wouldnt the message only be transformed once
> and it's offset committed?
>
> If anything stands out as incorrect, or something you've seen please let
> me know - this is rather new to me and my code seems to be following the
> same as other examples I see across the net
>
> Here's a redacted snippet of my stream:
>
> spark.readStream().format("kafka").option("kafka.bootstrap.servers",
> "X")
> .option("kafka.partition.assignment.strategy",
> RoundRobinAssignor.class.getName())
> .option("subscribe", """")
> .option("startingOffsets", "earliest")
> .load()
> .select("value")
> .as(Encoders.STRING())
> .map((MapFunction) value -> transform(value),
> Encoders.STRING())
> .writeStream()
> .format("kafka")
> .option("kafka.bootstrap.servers", "X")
> .option("topic", ""X"")
> .outputMode("append")
> .option("checkpointLocation", "/checkpoints/testCheckpoint")
> .start()
> .awaitTermination();
>
> Thanks!
> Austin
>


What is Spark context cleaner in structured streaming

2019-05-02 Thread Akshay Bhardwaj
Hi All,

I am using Spark structured streaming with spark 2.3 running on Yarn
cluster with hadoop 2.7.3 In the driver logs I see numerous lines as below.

2019-05-02 17:14:24.619 ContextCleaner Spark Context Cleaner [INFO] Cleaned
accumulator 81492577
2019-05-02 17:14:24.619 ContextCleaner Spark Context Cleaner [INFO] Cleaned
accumulator 81492514
2019-05-02 17:14:24.619 ContextCleaner Spark Context Cleaner [INFO] Cleaned
accumulator 81492531
2019-05-02 17:14:24.620 ContextCleaner Spark Context Cleaner [INFO] Cleaned
accumulator 81492524
2019-05-02 17:14:24.620 ContextCleaner Spark Context Cleaner [INFO] Cleaned
accumulator 81492617
2019-05-02 17:14:24.620 ContextCleaner Spark Context Cleaner [INFO] Cleaned
accumulator 81492475
2019-05-02 17:14:24.620 ContextCleaner Spark Context Cleaner [INFO] Cleaned
accumulator 81492467

Could anyone explain what do these logs indicate internally? I have
streaming interval of 500ms, reading data from Kafka topic with max batch
size as 1000.


Akshay Bhardwaj
+91-97111-33849


Re: Spark Structured Streaming | Highly reliable de-duplication strategy

2019-05-01 Thread Akshay Bhardwaj
Hi Anastasios,

Thanks for this.
I have a few doubts with this approach. The dropDuplicate operation will
keep all the data across triggers.

   1. Where is this data stored?
  - IN_MEMORY state means the data is not persisted during job resubmit.
  - Persistence in disk like HDFS has proved to be unreliable, as I
  have encountered corrupted files which causes errors on job restarts.



Akshay Bhardwaj
+91-97111-33849


On Wed, May 1, 2019 at 3:20 PM Anastasios Zouzias  wrote:

> Hi,
>
> Have you checked the docs, i.e.,
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#streaming-deduplication
>
> You can generate a uuid column in your streaming DataFrame and drop
> duplicate messages with a single line of code.
>
> Best,
> Anastasios
>
> On Wed, May 1, 2019 at 11:15 AM Akshay Bhardwaj <
> akshay.bhardwaj1...@gmail.com> wrote:
>
>> Hi All,
>>
>> Floating this again. Any suggestions?
>>
>>
>> Akshay Bhardwaj
>> +91-97111-33849
>>
>>
>> On Tue, Apr 30, 2019 at 7:30 PM Akshay Bhardwaj <
>> akshay.bhardwaj1...@gmail.com> wrote:
>>
>>> Hi Experts,
>>>
>>> I am using spark structured streaming to read message from Kafka, with a
>>> producer that works with at-least once guarantee. This streaming job is
>>> running on Yarn cluster with hadoop 2.7 and spark 2.3
>>>
>>> What is the most reliable strategy for avoiding duplicate data within
>>> stream in the scenarios of fail-over or job restarts/re-submits, and
>>> guarantee exactly once non-duplicate stream?
>>>
>>>
>>>1. One of the strategies I have read other people using is to
>>>maintain an external KV store for unique-key/checksum of the incoming
>>>message, and write to a 2nd kafka topic only if the checksum is not 
>>> present
>>>in KV store.
>>>- My doubts with this approach is how to ensure safe write to both
>>>   the 2nd topic and to KV store for storing checksum, in the case of 
>>> unwanted
>>>   failures. How does that guarantee exactly-once with restarts?
>>>
>>> Any suggestions are highly appreciated.
>>>
>>>
>>> Akshay Bhardwaj
>>> +91-97111-33849
>>>
>>
>
> --
> -- Anastasios Zouzias
> 
>


Re: Spark Structured Streaming | Highly reliable de-duplication strategy

2019-05-01 Thread Akshay Bhardwaj
Hi All,

Floating this again. Any suggestions?


Akshay Bhardwaj
+91-97111-33849


On Tue, Apr 30, 2019 at 7:30 PM Akshay Bhardwaj <
akshay.bhardwaj1...@gmail.com> wrote:

> Hi Experts,
>
> I am using spark structured streaming to read message from Kafka, with a
> producer that works with at-least once guarantee. This streaming job is
> running on Yarn cluster with hadoop 2.7 and spark 2.3
>
> What is the most reliable strategy for avoiding duplicate data within
> stream in the scenarios of fail-over or job restarts/re-submits, and
> guarantee exactly once non-duplicate stream?
>
>
>1. One of the strategies I have read other people using is to maintain
>an external KV store for unique-key/checksum of the incoming message, and
>write to a 2nd kafka topic only if the checksum is not present in KV store.
>- My doubts with this approach is how to ensure safe write to both the
>   2nd topic and to KV store for storing checksum, in the case of unwanted
>   failures. How does that guarantee exactly-once with restarts?
>
> Any suggestions are highly appreciated.
>
>
> Akshay Bhardwaj
> +91-97111-33849
>


Spark Structured Streaming | Highly reliable de-duplication strategy

2019-04-30 Thread Akshay Bhardwaj
Hi Experts,

I am using spark structured streaming to read message from Kafka, with a
producer that works with at-least once guarantee. This streaming job is
running on Yarn cluster with hadoop 2.7 and spark 2.3

What is the most reliable strategy for avoiding duplicate data within
stream in the scenarios of fail-over or job restarts/re-submits, and
guarantee exactly once non-duplicate stream?


   1. One of the strategies I have read other people using is to maintain
   an external KV store for unique-key/checksum of the incoming message, and
   write to a 2nd kafka topic only if the checksum is not present in KV store.
   - My doubts with this approach is how to ensure safe write to both the
  2nd topic and to KV store for storing checksum, in the case of unwanted
  failures. How does that guarantee exactly-once with restarts?

Any suggestions are highly appreciated.


Akshay Bhardwaj
+91-97111-33849


Re: Issue with offset management using Spark on Dataproc

2019-04-30 Thread Akshay Bhardwaj
Hi Austin,

Are you using Spark Streaming or Structured Streaming?

For better understanding, could you also provide sample code/config params
for your spark-kafka connector for the said streaming job?


Akshay Bhardwaj
+91-97111-33849


On Mon, Apr 29, 2019 at 10:34 PM Austin Weaver  wrote:

> Hey guys, relatively new Spark Dev here and i'm seeing some kafka offset
> issues and was wondering if you guys could help me out.
>
> I am currently running a spark job on Dataproc and am getting errors
> trying to re-join a group and read data from a kafka topic. I have done
> some digging and am not sure what the issue is. I have auto.offset.reset set
> to earliest so it should being reading from the earliest available
> non-committed offset and initially my spark logs look like this :
>
> 19/04/29 16:30:30 INFO
> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
> clientId=consumer-1, groupId=demo-group] Resetting offset for
> partition demo.topic-11 to offset 5553330.
> 19/04/29 16:30:30 INFO
> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
> clientId=consumer-1, groupId=demo-group] Resetting offset for
> partition demo.topic-2 to offset 553.
> 19/04/29 16:30:30 INFO
> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
> clientId=consumer-1, groupId=demo-group] Resetting offset for
> partition demo.topic-3 to offset 484.
> 19/04/29 16:30:30 INFO
> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
> clientId=consumer-1, groupId=demo-group] Resetting offset for
> partition demo.topic-4 to offset 586.
> 19/04/29 16:30:30 INFO
> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
> clientId=consumer-1, groupId=demo-group] Resetting offset for
> partition demo.topic-5 to offset 502.
> 19/04/29 16:30:30 INFO
> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
> clientId=consumer-1, groupId=demo-group] Resetting offset for
> partition demo.topic-6 to offset 561.
> 19/04/29 16:30:30 INFO
> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
> clientId=consumer-1, groupId=demo-group] Resetting offset for
> partition demo.topic-7 to offset 542.```
>
> But then the very next line I get an error trying to read from a
> nonexistent offset on the server (you can see that the offset for the
> partition differs from the one listed above, so I have no idea why it would
> be attempting to read form that offset, here is the error on the next line:
>
> org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets
> out of range with no configured reset policy for partitions:
> {demo.topic-11=4544296}
>
> Any ideas to why my spark job is constantly going back to this offset
> (4544296), and not the one it outputs originally (5553330)?
>
> It seems to be contradicting itself w a) the actual offset it says its on
> and the one it attempts to read and b) saying no configured reset policy
> --
> Austin Weaver
> Software Engineer
> FLYR, Inc.   www.flyrlabs.com
>


Re: spark structured streaming crash due to decompressing gzip file failure

2019-03-07 Thread Akshay Bhardwaj
Hi,

In your spark-submit command, try using the below config property and see
if this solves the problem.

--conf spark.sql.files.ignoreCorruptFiles=true

For me this worked to ignore reading empty/partially uploaded gzip files in
s3 bucket.

Akshay Bhardwaj
+91-97111-33849


On Thu, Mar 7, 2019 at 11:28 AM Lian Jiang  wrote:

> Hi,
>
> I have a structured streaming job which listens to a hdfs folder
> containing jsonl.gz files. The job crashed due to error:
>
> java.io.IOException: incorrect header check
> at
> org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native
> Method)
> at
> org.apache.hadoop.io.compress.zlib.ZlibDecompressor.decompress(ZlibDecompressor.java:225)
> at
> org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111)
> at
> org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
> at java.io.InputStream.read(InputStream.java:101)
> at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:182)
> at
> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:218)
> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:176)
> at
> org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:152)
> at
> org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:192)
> at
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
> at
> org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:186)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:109)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> Is there a way to skip the gz files that cannot be decompressed? Exception
> handling seems not help. The only workaround I can think of is to
> decompress the gz files into another folder first and make the spark
> streaming job listen to this new folder. But this workaround may not be
> better compared with the solution using a unstructured streaming job to
> directly decompress the gz file, read jsonl file, validate the records and
> write the validated records into parquet.
>
> Any idea is highly appreciated!
>
>
>
>
>


Re: Structured Streaming to Kafka Topic

2019-03-06 Thread Akshay Bhardwaj
Hi Pankaj,

What version of Spark are you using?

If you are using 2.4+ then there is an inbuilt function "to_json" which
converts the columns of your dataset to JSON format.
https://spark.apache.org/docs/2.4.0/api/sql/#to_json

Akshay Bhardwaj
+91-97111-33849


On Wed, Mar 6, 2019 at 10:29 PM Pankaj Wahane  wrote:

> Hi,
>
> I am using structured streaming for ETL.
>
> val data_stream = spark
>   .readStream // constantly expanding dataframe
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "localhost:9092")
>   .option("subscribe", "sms_history")
>   .option("startingOffsets", "earliest") // begin from start of topic
>   .option("failOnDataLoss", "false")
>   .load()
>
> I transform this into a DataSet with following schema.
>
> root
>  |-- accountId: long (nullable = true)
>  |-- countryId: long (nullable = true)
>  |-- credits: double (nullable = true)
>  |-- deliveryStatus: string (nullable = true)
>  |-- senderId: string (nullable = true)
>  |-- sentStatus: string (nullable = true)
>  |-- source: integer (nullable = true)
>  |-- createdOn: timestamp (nullable = true)
>  |-- send_success_credits: double (nullable = true)
>  |-- send_error_credits: double (nullable = true)
>  |-- delivered_credits: double (nullable = true)
>  |-- invalid_sd_credits: double (nullable = true)
>  |-- undelivered_credits: double (nullable = true)
>  |-- unknown_credits: double (nullable = true)
>
>
> Now I want to write this transformed stream to another Kafka topic. I have
> temporarily used a UDF that accepts all these columns as parameters and
> create a json string for adding a column "value" for writing to Kafka.
>
> Is there easier and cleaner way to do the same?
>
>
> Thanks,
> Pankaj
>
>


Re: "java.lang.AssertionError: assertion failed: Failed to get records for **** after polling for 180000" error

2019-03-06 Thread Akshay Bhardwaj
Sorry message sent as incomplete.

To better debug the issue, please check the below config properties:

   - At Kafka consumer properties
  - max.partition.fetch.bytes within spark kafka consumer. If not set
  for consumer then the global config at broker level.
  - request.timeout.ms
   - At spark's configurations
  - spark.streaming.kafka.consumer.poll.ms
  - spark.network.timeout (If the above is not set, then poll.ms is
  default to spark.network.timeout)


Generally I have faced this issue if spark.streaming.kafka.consumer.poll.ms is
less than request.timeout.ms

Also, what is the average kafka record message size in bytes?



Akshay Bhardwaj
+91-97111-33849


On Wed, Mar 6, 2019 at 1:26 PM Akshay Bhardwaj <
akshay.bhardwaj1...@gmail.com> wrote:

> Hi,
>
> To better debug the issue, please check the below config properties:
>
>- max.partition.fetch.bytes within spark kafka consumer. If not set
>for consumer then the global config at broker level.
>- spark.streaming.kafka.consumer.poll.ms
>   - spark.network.timeout (If the above is not set, then poll.ms is
>   default to spark.network.timeout)
>-
>-
>
> Akshay Bhardwaj
> +91-97111-33849
>
>
> On Wed, Mar 6, 2019 at 8:39 AM JF Chen  wrote:
>
>> When my kafka executor reads data from kafka, sometimes it throws the
>> error "java.lang.AssertionError: assertion failed: Failed to get records
>> for  after polling for 18" , which after 3 minutes of executing.
>> The data waiting for read is not so huge, which is about 1GB. And other
>> partitions read by other tasks are very fast, the error always occurs on
>> some specific executor..
>>
>> Regard,
>> Junfeng Chen
>>
>


Re: "java.lang.AssertionError: assertion failed: Failed to get records for **** after polling for 180000" error

2019-03-05 Thread Akshay Bhardwaj
Hi,

To better debug the issue, please check the below config properties:

   - max.partition.fetch.bytes within spark kafka consumer. If not set for
   consumer then the global config at broker level.
   - spark.streaming.kafka.consumer.poll.ms
  - spark.network.timeout (If the above is not set, then poll.ms is
  default to spark.network.timeout)
   -
   -

Akshay Bhardwaj
+91-97111-33849


On Wed, Mar 6, 2019 at 8:39 AM JF Chen  wrote:

> When my kafka executor reads data from kafka, sometimes it throws the
> error "java.lang.AssertionError: assertion failed: Failed to get records
> for  after polling for 18" , which after 3 minutes of executing.
> The data waiting for read is not so huge, which is about 1GB. And other
> partitions read by other tasks are very fast, the error always occurs on
> some specific executor..
>
> Regard,
> Junfeng Chen
>


Re: Is there a way to validate the syntax of raw spark sql query?

2019-03-05 Thread Akshay Bhardwaj
Hi Kant,

You can try "explaining" the sql query.

spark.sql(sqlText).explain(true); //the parameter true is to get more
verbose query plan and it is optional.


This is the safest way to validate sql without actually executing/creating
a df/view in spark. It validates syntax as well as schema of tables/views
used.
If there is an issue with your SQL syntax then the method throws below
exception that you can catch

org.apache.spark.sql.catalyst.parser.ParseException


Hope this helps!



Akshay Bhardwaj
+91-97111-33849


On Fri, Mar 1, 2019 at 10:23 PM kant kodali  wrote:

> Hi All,
>
> Is there a way to validate the syntax of raw spark SQL query?
>
> for example, I would like to know if there is any isValid API call spark
> provides?
>
> val query = "select * from table"if(isValid(query)) {
> sparkSession.sql(query) } else {
> log.error("Invalid Syntax")}
>
> I tried the following
>
> val query = "select * morf table" // Invalid queryval parser = 
> spark.sessionState.sqlParsertry{
> parser.parseExpression(query)} catch (ParseException ex) {
> throw new Exception(ex); //Exception not getting thrown}Dataset<>Row df = 
> sparkSession.sql(query) // Exception gets thrown here
> df.writeStream.format("console").start()
>
> Question: parser.parseExpression is not catching the invalid syntax
> before I hit the sparkSession.sql. Other words it is not being helpful in
> the above code. any reason? My whole goal is to catch syntax errors before
> I pass it on to sparkSession.sql
>
>
>


Re: Spark 2.4.0 Master going down

2019-02-28 Thread Akshay Bhardwaj
Hi Lokesh,

Please provide further information to help identify the issue.

1) Are you running in a standalone mode or cluster mode? If cluster, then
is a spark master/slave or YARN/Mesos?
2) Have you tried checking if all ports between your master and the machine
with IP 192.168.43.167 are accessible?
3) Have you checked the memory consumption of the executors/driver running
in the cluster?


Akshay Bhardwaj
+91-97111-33849


On Wed, Feb 27, 2019 at 8:27 PM lokeshkumar  wrote:

> Hi All
>
> We are running Spark version 2.4.0 and we run few Spark streaming jobs
> listening on Kafka topics. We receive an average of 10-20 msgs per second.
> And the Spark master has been going down after 1-2 hours of it running.
> Exception is given below:
> Along with that spark executors also get killed.
>
> This was not happening with Spark 2.1.1 it started happening with Spark
> 2.4.0 any help/suggestion is appreciated.
>
> The exception that we see is
>
> Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1713)
> at
>
> org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64)
> at
>
> org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
> at
>
> org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:281)
> at
>
> org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
> Caused by: org.apache.spark.rpc.RpcTimeoutException: Cannot receive any
> reply from 192.168.43.167:40007 in 120 seconds. This timeout is controlled
> by spark.rpc.askTimeout
> at
> org.apache.spark.rpc.RpcTimeout.org
> $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
> at
>
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
> at
>
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
> at
>
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216)
> at scala.util.Try$.apply(Try.scala:192)
> at scala.util.Failure.recover(Try.scala:216)
> at
> scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
> at
> scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> at
>
> org.spark_project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
> at
>
> scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:136)
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
> at scala.concurrent.Promise$class.complete(Promise.scala:55)
> at
> scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:157)
> at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
> at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> at
>
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:63)
> at
>
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78)
> at
>
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
> at
>
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
> at
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> at
> scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:54)
> at
>
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
> at
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
> at scala.concurrent.Promise$class.tryFailure(Promise.scala:112)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:157)
&g

Re: Spark Streaming - Proeblem to manage offset Kafka and starts from the beginning.

2019-02-27 Thread Akshay Bhardwaj
Hi Gabor,

I guess you are looking at Kafka 2.1 but Guillermo mentioned initially that
they are working with Kafka 1.0

Akshay Bhardwaj
+91-97111-33849


On Wed, Feb 27, 2019 at 5:41 PM Gabor Somogyi 
wrote:

> Where exactly? In Kafka broker configuration section here it's 10080:
> https://kafka.apache.org/documentation/
>
> offsets.retention.minutes After a consumer group loses all its consumers
> (i.e. becomes empty) its offsets will be kept for this retention period
> before getting discarded. For standalone consumers (using manual
> assignment), offsets will be expired after the time of last commit plus
> this retention period. int 10080 [1,...] high read-only
>
> On Wed, Feb 27, 2019 at 1:04 PM Guillermo Ortiz 
> wrote:
>
>> I'm going to check the value, but I didn't change it., normally, the
>> process is always running but sometimes I have to restarted to apply some
>> changes. Sometimes it starts from the beginning and others continue for the
>> last offset.
>>
>> El mié., 27 feb. 2019 a las 12:25, Akshay Bhardwaj (<
>> akshay.bhardwaj1...@gmail.com>) escribió:
>>
>>> Hi Gabor,
>>>
>>> I am talking about offset.retention.minutes which is set default as
>>> 1440 (or 24 hours)
>>>
>>> Akshay Bhardwaj
>>> +91-97111-33849
>>>
>>>
>>> On Wed, Feb 27, 2019 at 4:47 PM Gabor Somogyi 
>>> wrote:
>>>
>>>> Hi Akshay,
>>>>
>>>> The feature what you've mentioned has a default value of 7 days...
>>>>
>>>> BR,
>>>> G
>>>>
>>>>
>>>> On Wed, Feb 27, 2019 at 7:38 AM Akshay Bhardwaj <
>>>> akshay.bhardwaj1...@gmail.com> wrote:
>>>>
>>>>> Hi Guillermo,
>>>>>
>>>>> What was the interval in between restarting the spark job? As a
>>>>> feature in Kafka, a broker deleted offsets for a consumer group after
>>>>> inactivity of 24 hours.
>>>>> In such a case, the newly started spark streaming job will read
>>>>> offsets from beginning for the same groupId.
>>>>>
>>>>> Akshay Bhardwaj
>>>>> +91-97111-33849
>>>>>
>>>>>
>>>>> On Thu, Feb 21, 2019 at 9:08 PM Gabor Somogyi <
>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>
>>>>>> From the info you've provided not much to say.
>>>>>> Maybe you could collect sample app, logs etc, open a jira and we can
>>>>>> take a deeper look at it...
>>>>>>
>>>>>> BR,
>>>>>> G
>>>>>>
>>>>>>
>>>>>> On Thu, Feb 21, 2019 at 4:14 PM Guillermo Ortiz 
>>>>>> wrote:
>>>>>>
>>>>>>> I' working with Spark Streaming 2.0.2 and Kafka 1.0.0 using Direct
>>>>>>> Stream as connector. I consume data from Kafka and autosave the offsets.
>>>>>>> I can see Spark doing commits in the logs of the last offsets
>>>>>>> processed, Sometimes I have restarted spark and it starts from the
>>>>>>> beginning, when I'm using the same groupId.
>>>>>>>
>>>>>>> Why could it happen? it only happen rarely.
>>>>>>>
>>>>>>


Re: Spark Streaming - Proeblem to manage offset Kafka and starts from the beginning.

2019-02-27 Thread Akshay Bhardwaj
Hi Gabor,

I am talking about offset.retention.minutes which is set default as 1440
(or 24 hours)

Akshay Bhardwaj
+91-97111-33849


On Wed, Feb 27, 2019 at 4:47 PM Gabor Somogyi 
wrote:

> Hi Akshay,
>
> The feature what you've mentioned has a default value of 7 days...
>
> BR,
> G
>
>
> On Wed, Feb 27, 2019 at 7:38 AM Akshay Bhardwaj <
> akshay.bhardwaj1...@gmail.com> wrote:
>
>> Hi Guillermo,
>>
>> What was the interval in between restarting the spark job? As a feature
>> in Kafka, a broker deleted offsets for a consumer group after inactivity of
>> 24 hours.
>> In such a case, the newly started spark streaming job will read offsets
>> from beginning for the same groupId.
>>
>> Akshay Bhardwaj
>> +91-97111-33849
>>
>>
>> On Thu, Feb 21, 2019 at 9:08 PM Gabor Somogyi 
>> wrote:
>>
>>> From the info you've provided not much to say.
>>> Maybe you could collect sample app, logs etc, open a jira and we can
>>> take a deeper look at it...
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Thu, Feb 21, 2019 at 4:14 PM Guillermo Ortiz 
>>> wrote:
>>>
>>>> I' working with Spark Streaming 2.0.2 and Kafka 1.0.0 using Direct
>>>> Stream as connector. I consume data from Kafka and autosave the offsets.
>>>> I can see Spark doing commits in the logs of the last offsets
>>>> processed, Sometimes I have restarted spark and it starts from the
>>>> beginning, when I'm using the same groupId.
>>>>
>>>> Why could it happen? it only happen rarely.
>>>>
>>>


Re: Spark Streaming - Proeblem to manage offset Kafka and starts from the beginning.

2019-02-26 Thread Akshay Bhardwaj
Hi Guillermo,

What was the interval in between restarting the spark job? As a feature in
Kafka, a broker deleted offsets for a consumer group after inactivity of 24
hours.
In such a case, the newly started spark streaming job will read offsets
from beginning for the same groupId.

Akshay Bhardwaj
+91-97111-33849


On Thu, Feb 21, 2019 at 9:08 PM Gabor Somogyi 
wrote:

> From the info you've provided not much to say.
> Maybe you could collect sample app, logs etc, open a jira and we can take
> a deeper look at it...
>
> BR,
> G
>
>
> On Thu, Feb 21, 2019 at 4:14 PM Guillermo Ortiz 
> wrote:
>
>> I' working with Spark Streaming 2.0.2 and Kafka 1.0.0 using Direct Stream
>> as connector. I consume data from Kafka and autosave the offsets.
>> I can see Spark doing commits in the logs of the last offsets processed,
>> Sometimes I have restarted spark and it starts from the beginning, when I'm
>> using the same groupId.
>>
>> Why could it happen? it only happen rarely.
>>
>


Re: Spark 2.3 | Structured Streaming | Metric for numInputRows

2019-02-26 Thread Akshay Bhardwaj
If it helps, below is the same query progress report that I am able to
fetch from streaming query

{
  "id" : "f2cb24d4-622e-4355-b315-8e440f01a90c",
  "runId" : "6f3834ff-10a9-4f57-ae71-8a434ee519ce",
  "name" : "query_name_1",
  "timestamp" : "2019-02-27T06:06:58.500Z",
  "batchId" : 3725,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
"addBatch" : 275,
"getBatch" : 3,
"getOffset" : 8,
"queryPlanning" : 79,
"triggerExecution" : 409,
"walCommit" : 43
  },
  "stateOperators" : [ ],
  "sources" : [ {
"description" : "KafkaSource[SubscribePattern[kafka_events_topic]]",
"startOffset" : {
  "kafka_events_topic" : {
"2" : 32822078,
"1" : 114248484,
"0" : 114242134
  }
},
"endOffset" : {
  "kafka_events_topic" : {
"2" : 32822496,
"1" : 114248896,
"0" : 114242552
  }
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
"description" : "ForeachSink"
  }
}



Akshay Bhardwaj
+91-97111-33849


On Wed, Feb 27, 2019 at 11:36 AM Akshay Bhardwaj <
akshay.bhardwaj1...@gmail.com> wrote:

> Hi Experts,
>
> I have a structured streaming query running on spark 2.3 over yarn
> cluster, with below features:
>
>- Reading JSON messages from Kafka topic with:
>   - maxOffsetsPerTrigger as 5000
>   - trigger interval of my writeStream task is 500ms.
>   - streaming dataset is defined as events with  fields: id, name,
>   refid, createdTime
>- A cached dataset of CSV file read from HDFS, such that, the CSV file
>contains a list of prohibited events refid
>- I have defined an intermediate dataset with the following query,
>which filters out prohibited events from the streaming data
>   - select * from events where event.refid NOT IN (select refid from
>   CSVData)
>
>
> The query progress from StreamingQuery object, it shows metrics as
> numInputRows, inputRowsPerSecond and processedRowsPerSecond as 0, although
> my query is executing with an execution time of ~400ms. And I can see that
> the query does take records from kafka and writes the processed data to the
> output database.
>
> If I remove the event filtering tasks, then all the metrics are displayed
> properly.
>
> Can anyone please point out why this behaviour is observed and how to
> gather metrics like numInputRows, etc while also filtering events fetched
> from CSV file?
> I am also open to suggestions if there is a better way of filtering out
> the prohibited events in structured streaming.
>
> Thanks in advance.
>
> Akshay Bhardwaj
> +91-97111-33849
>


Spark 2.3 | Structured Streaming | Metric for numInputRows

2019-02-26 Thread Akshay Bhardwaj
Hi Experts,

I have a structured streaming query running on spark 2.3 over yarn cluster,
with below features:

   - Reading JSON messages from Kafka topic with:
  - maxOffsetsPerTrigger as 5000
  - trigger interval of my writeStream task is 500ms.
  - streaming dataset is defined as events with  fields: id, name,
  refid, createdTime
   - A cached dataset of CSV file read from HDFS, such that, the CSV file
   contains a list of prohibited events refid
   - I have defined an intermediate dataset with the following query, which
   filters out prohibited events from the streaming data
  - select * from events where event.refid NOT IN (select refid from
  CSVData)


The query progress from StreamingQuery object, it shows metrics as
numInputRows, inputRowsPerSecond and processedRowsPerSecond as 0, although
my query is executing with an execution time of ~400ms. And I can see that
the query does take records from kafka and writes the processed data to the
output database.

If I remove the event filtering tasks, then all the metrics are displayed
properly.

Can anyone please point out why this behaviour is observed and how to
gather metrics like numInputRows, etc while also filtering events fetched
from CSV file?
I am also open to suggestions if there is a better way of filtering out the
prohibited events in structured streaming.

Thanks in advance.

Akshay Bhardwaj
+91-97111-33849