Spark Streaming fails with unable to get records after polling for 512 ms

2017-11-14 Thread jkagitala
Hi,

I'm trying to add spark-streaming to our kafka topic. But, I keep getting
this error
java.lang.AssertionError: assertion failed: Failed to get record after
polling for 512 ms.

I tried to add different params like max.poll.interval.ms,
spark.streaming.kafka.consumer.poll.ms to 1ms in kafkaParams. 
But, i still get failed to get records after 512ms. Not sure, even adding
the above params doesn't change the polling time.

Without spark-streaming, i'm able to fetch the records. Only with
spark-streaming addon, i get this error.

Any help is greatly appreciated. Below, is the code i'm using.

SparkConf sparkConf = new
SparkConf().setAppName("JavaFlingerSparkApplication").setMaster("local[*]");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
Durations.seconds(10));

kafkaParams.put("bootstrap.servers", hosts);
kafkaParams.put("group.id", groupid);
kafkaParams.put("auto.commit.enable", false);   
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", BytesDeserializer.class);
kafkaParams.put("auto.offset.reset", "earliest");
//kafkaParams.put("max.poll.interval.ms", 12000);
//kafkaParams.put("spark.streaming.kafka.consumer.poll.ms", 12000); 
//kafkaParams.put("request.timeout.ms", 12000);


JavaInputDStream>> messages = 
  KafkaUtils.createDirectStream(ssc,
LocationStrategies.PreferConsistent(), 
 
ConsumerStrategies.Subscribe(topics, kafkaParams));
messages.foreachRDD(rdd -> {
List>> input = 
rdd.collect();
System.out.println("count is"+input.size());
});
ssc.start();
ssc.awaitTermination();

Thanks
Jagadish



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Executor not getting added SparkUI & Spark Eventlog in deploymode:cluster

2017-11-14 Thread Mamillapalli, Purna Pradeep
Hi all,

Im performing spark submit using Spark rest api POST operation on 6066 port 
with below config

> Launch Command:
> "/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.141-1.b16.el7_3.x86_64/jre/bin/java"
> "-cp" "/usr/local/spark/conf/:/usr/local/spark/jars/*" "-Xmx4096M"
> "-Dspark.eventLog.enabled=true"
> "-Dspark.app.name=WorkflowApp"
> "-Dspark.submit.deployMode=cluster"
> "-Dspark.local.dir=/data0,/data1,/data2,/data3"
> "-Dspark.executor.cores=2" "-Dspark.master=spark://:7077"
> "-Dspark.serializer=org.apache.spark.serializer.KryoSerializer"
> "-Dspark.jars=s3a://<***>.jar" "-Dspark.driver.supervise=false"
> "-Dspark.history.fs.logDirectory=s3a://<*>/"
> "-Dspark.hadoop.fs.s3a.server-side-encryption-algorithm=AES256"
> "-Dspark.driver.memory=4G" "-Dspark.executor.memory=4G"
> "-Dspark.eventLog.dir=s3a://<*>/"
> "org.apache.spark.deploy.worker.DriverWrapper" "spark://Worker@<***>"
> "/usr/local/spark/work/driver-<***>.jar" "MyApp" "-c" "s3a://<***>"


when i looked into Spark eventlog below is what i observed

{"Event":"SparkListenerExecutorAdded","Timestamp":1510633498623,"Executor 
ID":"driver","Executor Info":{"Host":"localhost","Total Cores":2,"Log Urls":{}}}
"spark.master":"local[*]"


Though i ran in deployMode as cluster  the slave ip is not shown in Host 
section & spark.master is shown as local[*] above ,because of this the job is 
running only on driver and therefore when job is submitted its not showing up 
in http://:8080  under Running and Completed applications and it 
shows only under Running Drivers & Completed Drivers. Please suggest



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Process large JSON file without causing OOM

2017-11-14 Thread Alec Swan
Thanks all. I am not submitting a spark job explicitly. Instead, I am using
the Spark library functionality embedded in my web service as shown in the
code I included in the previous email. So, effectively Spark SQL runs in
the web service's JVM. Therefore, --driver-memory option would not (and did
not) work for me.

I did try setting the following environment variables
SPARK_DRIVER_MEMORY=5g;SPARK_EXECUTOR_MEMORY=5g but they didn't have any
effect. Passing "-Dspark.executor.memory=6g  -Dspark.driver.memory=6g" JVM
parameters had the same effect as setting them in SparkConf in the code,
i.e. they showed up in Spark UI but I still got OOM.

My use case is somewhat strange because I just wanted to use Spark SQL
library for it's multi-format (ORC, Parquet, JSON) support but I really
didn't really need the rest of Spark functionality. Should I be considering
submitting my Spark code as a job (to be run locally) from the web service
code?

So far, in this thread we've been focusing on configuring larger memory
pools. But I wonder if there is a way to stream/batch the content of JSON
file in order to convert it to ORC piecemeal and avoid reading the whole
JSON file in memory in the first place?




Thanks,

Alec

On Tue, Nov 14, 2017 at 2:58 AM, Sonal Goyal  wrote:

> If you are running Spark with local[*] as master, there will be a single
> process whose memory will be controlled by --driver-memory command line
> option to spark submit. Check
>
> http://spark.apache.org/docs/latest/configuration.html
>
> spark.driver.memory 1g Amount of memory to use for the driver process,
> i.e. where SparkContext is initialized. (e.g. 1g, 2g).
> *Note:* In client mode, this config must not be set through the SparkConf 
> directly
> in your application, because the driver JVM has already started at that
> point. Instead, please set this through the --driver-memory command line
> option or in your default properties file.
>
> Thanks,
> Sonal
> Nube Technologies 
>
> 
>
>
>
> On Tue, Nov 14, 2017 at 9:37 AM, Alec Swan  wrote:
>
>> Hi Joel,
>>
>> Here are the relevant snippets of my code and an OOM error thrown
>> in frameWriter.save(..). Surprisingly, the heap dump is pretty small ~60MB
>> even though I am running with -Xmx10G and 4G executor and driver memory as
>> shown below.
>>
>> SparkConf sparkConf = new SparkConf()
>> .setAppName("My Service")
>> .setMaster("local[*]")
>> .set("spark.ui.enabled", "true")
>> .set("spark.executor.memory", "4G")
>> .set("spark.driver.memory", "4G");
>>
>> sparkSessionBuilder = SparkSession.builder().config(
>> sparkConf).enableHiveSupport();
>>
>> Dataset events = sparkSession.read()
>> .format("json")
>> .schema(inputConfig.getSchema())
>> .load(inputFile.getPath());
>>
>> DataFrameWriter frameWriter = events.selectExpr(JavaConversi
>> ons.asScalaBuffer(outputSchema.getColumns())) // select "data.customer
>> AS `customer`", ...
>> .write()
>> .options(outputConfig.getProperties())
>> // compression=zlib
>> .format("orc")
>> .partitionBy(JavaConversions.a
>> sScalaBuffer(outputSchema.getPartitions())) // partition by "customer"
>> .save(outputUri.getPath());
>>
>>
>> Here is the error log I get at runtime:
>>
>> 17/11/14 03:36:16 INFO CodeGenerator: Code generated in 115.616924 ms
>> 17/11/14 03:36:17 INFO CodecPool: Got brand-new decompressor [.snappy]
>> java.lang.OutOfMemoryError: Java heap space
>> Dumping heap to java_pid3790.hprof ...
>> Heap dump file created [62653841 bytes in 2.212 secs]
>> #
>> # java.lang.OutOfMemoryError: Java heap space
>> # -XX:OnOutOfMemoryError="kill -9 %p"
>> #   Executing "kill -9 3790"...
>>
>>
>> And here is the thread from the thread dump that caused OOM:
>>
>> "Executor task launch worker for task 0" daemon prio=5 tid=90 RUNNABLE
>> at java.lang.OutOfMemoryError.(OutOfMemoryError.java:48)
>> at org.apache.hadoop.io.compress.BlockDecompressorStream.getCom
>> pressedData(BlockDecompressorStream.java:123)
>> at org.apache.hadoop.io.compress.BlockDecompressorStream.decomp
>> ress(BlockDecompressorStream.java:98)
>> at org.apache.hadoop.io.compress.DecompressorStream.read(Decomp
>> ressorStream.java:85)
>> at java.io.InputStream.read(InputStream.java:101)
>> at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
>>Local Variable: byte[]#3957
>>Local Variable: org.apache.hadoop.io.compress.
>> BlockDecompressorStream#1
>> at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
>> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
>>Local Variable: org.apache.hadoop.mapreduce.li
>> b.input.SplitLineReader#1
>>Local Variable: org.apache.hadoop.io.Text#5
>> 

Re: Use of Accumulators

2017-11-14 Thread Kedarnath Dixit

Yes!



Thanks!
~Kedar Dixit
Bigdata Analytics at Persistent Systems Ltd.


From: Holden Karau 
Sent: 14 November 2017 20:04:50
To: Kedarnath Dixit
Cc: user@spark.apache.org
Subject: Re: Use of Accumulators

And where do you want to read the toggle back from? On the driver?

On Tue, Nov 14, 2017 at 3:52 AM Kedarnath Dixit 
> wrote:

Hi,



Inside the transformation if there is any change for the Variable’s associated 
data, I want to just toggle it saying there is some change while processing the 
data.



Please let me know if we can runtime do this.





Thanks!

~Kedar Dixit

Bigdata Analytics at Persistent Systems Ltd.



From: Holden Karau [via Apache Spark User List] 
[mailto:ml+s1001560n29995...@n3.nabble.com]
Sent: Tuesday, November 14, 2017 1:16 PM
To: Kedarnath Dixit 
>

Subject: Re: Use of Accumulators



So you want to set an accumulator to 1 after a transformation has fully 
completed? Or what exactly do you want to do?



On Mon, Nov 13, 2017 at 9:47 PM vaquar khan <[hidden 
email]> wrote:

Confirmed ,you can use Accumulators :)



Regards,

Vaquar khan

On Mon, Nov 13, 2017 at 10:58 AM, Kedarnath Dixit <[hidden 
email]> wrote:

Hi,



We need some way to toggle the flag of  a variable in transformation.



We are thinking to make use of spark  Accumulators for this purpose.



Can we use these as below:



Variables  -> Initial Value

 Variable1 -> 0

 Variable2 -> 0



In one of the transformations if we need to make Variable2's value to 1. Can we 
achieve this using Accumulators? Please confirm.



Thanks!



With Regards,

~Kedar Dixit



[hidden email] | 
@kedarsdixit | M +91 90499 15588 | T +91 (20) 6703 4783

Persistent Systems | Partners In Innovation | 
www.persistent.com

DISCLAIMER
==
This e-mail may contain privileged and confidential information which is the 
property of Persistent Systems Ltd. It is intended only for the use of the 
individual or entity to which it is addressed. If you are not the intended 
recipient, you are not authorized to read, retain, copy, print, distribute or 
use this message. If you have received this communication in error, please 
notify the sender and delete all copies of this message. Persistent Systems 
Ltd. does not accept any liability for virus infected mails.

--

Regards,

Vaquar Khan

+1 -224-436-0783

Greater Chicago

--

Twitter: https://twitter.com/holdenkarau





If you reply to this email, your message will be added to the discussion below:

http://apache-spark-user-list.1001560.n3.nabble.com/Use-of-Accumulators-tp29975p29995.html

To start a new topic under Apache Spark User List, email 
ml+s1001560n1...@n3.nabble.com
To unsubscribe from Apache Spark User List, click 
here.
NAML

--
Twitter: https://twitter.com/holdenkarau


Re: Use of Accumulators

2017-11-14 Thread Holden Karau
And where do you want to read the toggle back from? On the driver?

On Tue, Nov 14, 2017 at 3:52 AM Kedarnath Dixit <
kedarnath_di...@persistent.com> wrote:

> Hi,
>
>
>
> Inside the transformation if there is any change for the Variable’s
> associated data, I want to just toggle it saying there is some change while
> processing the data.
>
>
>
> Please let me know if we can runtime do this.
>
>
>
>
>
> Thanks!
>
> *~Kedar Dixit*
>
> Bigdata Analytics at Persistent Systems Ltd.
>
>
>
> *From:* Holden Karau [via Apache Spark User List] [
> mailto:ml+s1001560n29995...@n3.nabble.com
> ]
> *Sent:* Tuesday, November 14, 2017 1:16 PM
> *To:* Kedarnath Dixit 
>
>
> *Subject:* Re: Use of Accumulators
>
>
>
> So you want to set an accumulator to 1 after a transformation has fully
> completed? Or what exactly do you want to do?
>
>
>
> On Mon, Nov 13, 2017 at 9:47 PM vaquar khan <[hidden email]
> > wrote:
>
> Confirmed ,you can use Accumulators :)
>
>
>
> Regards,
>
> Vaquar khan
>
> On Mon, Nov 13, 2017 at 10:58 AM, Kedarnath Dixit <[hidden email]
> > wrote:
>
> Hi,
>
>
>
> We need some way to toggle the flag of  a variable in transformation.
>
>
>
> We are thinking to make use of spark  Accumulators for this purpose.
>
>
>
> Can we use these as below:
>
>
>
> Variables  -> Initial Value
>
>  Variable1 -> 0
>
>  Variable2 -> 0
>
>
>
> In one of the transformations if we need to make Variable2's value to 1.
> Can we achieve this using Accumulators? Please confirm.
>
>
>
> Thanks!
>
>
>
> With Regards,
>
> *~Kedar Dixit*
>
>
>
> [hidden email]  |
> @kedarsdixit | M  value="+919049915588" target="_blank">+91 90499 15588 | T  value="+912067034783" target="_blank">+91 (20) 6703 4783
>
> *Persistent Systems | **Partners In Innovation** | www.persistent.com
> *
>
> DISCLAIMER
> ==
> This e-mail may contain privileged and confidential information which is
> the property of Persistent Systems Ltd. It is intended only for the use of
> the individual or entity to which it is addressed. If you are not the
> intended recipient, you are not authorized to read, retain, copy, print,
> distribute or use this message. If you have received this communication in
> error, please notify the sender and delete all copies of this message.
> Persistent Systems Ltd. does not accept any liability for virus infected
> mails.
>
> --
>
> Regards,
>
> Vaquar Khan
>
> +1 -224-436-0783
>
> Greater Chicago
>
> --
>
> Twitter: https://twitter.com/holdenkarau
>
>
> --
>
> *If you reply to this email, your message will be added to the discussion
> below:*
>
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Use-of-Accumulators-tp29975p29995.html
>
> To start a new topic under Apache Spark User List, email
> ml+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>
-- 
Twitter: https://twitter.com/holdenkarau


Spark Structured Streaming + Kafka

2017-11-14 Thread Agostino Calamita
Hi,
I have a problem with Structured Streaming and Kafka.
I have 2 brokers and a topic with 8 partitions and replication factor 2.

This is my driver program:

public static void main(String[] args)
{

SparkSession spark = SparkSession
  .builder()
  .appName("StreamFromKafka")
  .config("spark.sql.streaming.minBatchesToRetain", 5)
  .getOrCreate();


spark.sessionState().conf().setConf(
org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS() ,8);

Dataset df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers",
"host100:9092,host200:9092")
  .option("subscribe", "acrmonitor")
  .option("startingOffsets", "earliest")
  .load();

String windowSize = "10 minutes";
String slide = "10 minutes";
String startTime = "0 minutes";

Dataset df3 = df.select(
 get_json_object(
col("value").cast("string"), "$.address").alias("address"),
 get_json_object(
col("value").cast("string"), "$.type").alias("type"),
 get_json_object(
col("value").cast("string"),
"$.insertDate").alias("insertDate").cast("timestamp")
)
.withWatermark("insertDate", "15 minutes")
.groupBy(
 col("address"),
 col("type"),
 window(col("insertDate").cast("timestamp"),
windowSize, slide , startTime)
 )
.count();

String  chkptDir = "/tmp/checkPoint" ;

 StreamingQuery query = df3
.writeStream()
.outputMode(OutputMode.Update())
.option("checkpointLocation", chkptDir)
.foreach(new JDBCsink()
).trigger(Trigger.ProcessingTime(3)).start();

try {
  query.awaitTermination();
} catch (Exception e)
{ System.err.println("ERROR: " + e.getMessage()); }

spark.cloneSession();

}

I use a checkpoint directory.

When I stop one kafka broker ( the other one remain alive ), driver program
stops reading messages from queue and process them. I waited for more than
5 minutes.
If I restart driver program ( with one broker down, it reads and processes
messages.

If I try to stop one kafka broker, ( so the driver stops to read ...) and
after a few minutes I restart the broker, then driver programs begins again
to read messages and process them ( without restarting driver program ).

Is there a way to permit driver program to continue reading kafka topic,
when only one of broker goes down, without restart driver ?

Thanks.





Re: Measuring cluster utilization of a streaming job

2017-11-14 Thread Teemu Heikkilä
Without knowing anything about your pipeline the best estimate of the resources 
needed is to run the job with same ingestion rate as the normal production load.

With kafka you can enable back pressure so with high load also your latency 
will just increase but you don’t have to have capacity for handling the spikes. 
If you want you can then ie. autoscale the cluster to respond for the load.

If you are using Yarn you can isolate and limit some resources so you can also 
run other workloads in same cluster if you need to have lots of elasticity. 

Usually with streaming jobs the concerns are not with computing capacity but 
more with network bandwidth and memory consumption.


> On 14.11.2017, at 14.54, Nadeem Lalani  wrote:
> 
> Hi,
> 
> I was wondering if anyone has done some work around measuring the cluster 
> resource utilization of a "typical" spark streaming job.
> 
> We are trying to build a message ingestion system which will read from Kafka 
> and do some processing.  We have had some concerns raised in the team that a 
> 24*7 streaming job might not be the best use of cluster resources especially 
> when our use cases are to process data in a micro batch fashion and are not 
> truly streaming.
> 
> We wanted to measure  as to how much resource does a spark streaming process 
> take. Any pointers on where one would start?
> 
> We are on Yarn and plan to use spark 2.1
> 
> Thanks in advance,
> Nadeem 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Measuring cluster utilization of a streaming job

2017-11-14 Thread Nadeem Lalani
Hi,

I was wondering if anyone has done some work around measuring the cluster
resource utilization of a "typical" spark streaming job.

We are trying to build a message ingestion system which will read from
Kafka and do some processing.  We have had some concerns raised in the team
that a 24*7 streaming job might not be the best use of cluster resources
especially when our use cases are to process data in a micro batch fashion
and are not truly streaming.

We wanted to measure  as to how much resource does a spark streaming
process take. Any pointers on where one would start?

We are on Yarn and plan to use spark 2.1

Thanks in advance,
Nadeem


RE: Use of Accumulators

2017-11-14 Thread Kedarnath Dixit
Hi,

Inside the transformation if there is any change for the Variable’s associated 
data, I want to just toggle it saying there is some change while processing the 
data.

Please let me know if we can runtime do this.


Thanks!
~Kedar Dixit
Bigdata Analytics at Persistent Systems Ltd.

From: Holden Karau [via Apache Spark User List] 
[mailto:ml+s1001560n29995...@n3.nabble.com]
Sent: Tuesday, November 14, 2017 1:16 PM
To: Kedarnath Dixit 
>
Subject: Re: Use of Accumulators

So you want to set an accumulator to 1 after a transformation has fully 
completed? Or what exactly do you want to do?

On Mon, Nov 13, 2017 at 9:47 PM vaquar khan <[hidden 
email]> wrote:
Confirmed ,you can use Accumulators :)

Regards,
Vaquar khan

On Mon, Nov 13, 2017 at 10:58 AM, Kedarnath Dixit <[hidden 
email]> wrote:

Hi,



We need some way to toggle the flag of  a variable in transformation.



We are thinking to make use of spark  Accumulators for this purpose.



Can we use these as below:



Variables  -> Initial Value

 Variable1 -> 0

 Variable2 -> 0



In one of the transformations if we need to make Variable2's value to 1. Can we 
achieve this using Accumulators? Please confirm.



Thanks!



With Regards,

~Kedar Dixit


[hidden email] | @kedarsdixit | M 
+91 
90499 15588 | T +91 (20) 6703 4783

Persistent Systems | Partners In Innovation | 
www.persistent.com
DISCLAIMER
==
This e-mail may contain privileged and confidential information which is the 
property of Persistent Systems Ltd. It is intended only for the use of the 
individual or entity to which it is addressed. If you are not the intended 
recipient, you are not authorized to read, retain, copy, print, distribute or 
use this message. If you have received this communication in error, please 
notify the sender and delete all copies of this message. Persistent Systems 
Ltd. does not accept any liability for virus infected mails.



--
Regards,
Vaquar Khan
+1 -224-436-0783
Greater Chicago
--
Twitter: https://twitter.com/holdenkarau


If you reply to this email, your message will be added to the discussion below:
http://apache-spark-user-list.1001560.n3.nabble.com/Use-of-Accumulators-tp29975p29995.html
To start a new topic under Apache Spark User List, email 
ml+s1001560n1...@n3.nabble.com
To unsubscribe from Apache Spark User List, click 
here.
NAML