Re: Core allocation is scattered

2019-07-25 Thread Srikanth Sriram
s like node1=16 cores > and node 2=4 cores . but cores are allocated like node1=2 node > =1-node 14=1 like that. Is there any conf property i need to > change. I know with dynamic allocation we can use below but without dynamic > allocation is there any? > --conf "spark.dynamicAllocation.maxExecutors=2" > > > Thanks > Amit > -- Regards, Srikanth Sriram

Task failure to read input files

2018-04-13 Thread Srikanth
y to print the full file name when such failures happen? So that I can then manually check if the file is indeed corrupted. Thanks, Srikanth

Spark ML DAG Pipelines

2017-09-07 Thread Srikanth Sampath
Hi Spark Experts, Can someone point me to some examples for non-linear (DAG) ML pipelines. That would be of great help. Thanks much in advance -Srikanth

Spark streaming app leaking memory?

2017-05-16 Thread Srikanth
] 17/05/16 15:12:02 WARN Executor: 1 block locks were not released by TID = 7807: [rdd_1_39] I notice that "Managed memory leak" logs are not seen when I use G1GC. Srikanth

Re: Spark streaming + kafka error with json library

2017-03-30 Thread Srikanth
Thanks for the tip. That worked. When would one use the assembly? On Wed, Mar 29, 2017 at 7:13 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > Try depending on "spark-streaming-kafka-0-10_2.11" (not the assembly) > > On Wed, Mar 29, 2017 at 9:59 AM, Srikan

Spark streaming + kafka error with json library

2017-03-29 Thread Srikanth
rs\stati\.ivy2\cache\org.scala-lang.modules\scala-parser-combinators_2.11\bundles\scala-parser-combinators_2.11-1.0.4.jar:scala/util/parsing/combinator/ImplicitConversions$$anonfun$flatten2$1.class DependencyTree didn't show spark-streaming-kafka-0-10-assembly pulling json4s-native. Any idea how to resolve this? I'm using spark version 2.1.0 Thanks, Srikanth

Re: Exception in spark streaming + kafka direct app

2017-02-07 Thread Srikanth
this time. So will driver fail and exit in such cases? I've seen drivers exit after a job has hit max retry attempts. This is different though rt? Srikanth On Tue, Feb 7, 2017 at 5:25 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > Does restarting after a few minutes solves th

Exception in spark streaming + kafka direct app

2017-02-07 Thread Srikanth
? Regards, Srikanth

Re: Spark 2.0 with Kafka 0.10 exception

2016-10-21 Thread Srikanth
o make a note of that in the docs. > > On Thu, Oct 20, 2016 at 12:13 PM, Srikanth <srikanth...@gmail.com> wrote: > > Yeah, setting those params helped. > > > > On Wed, Oct 19, 2016 at 1:32 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> > >>

Re: Spark 2.0 with Kafka 0.10 exception

2016-10-20 Thread Srikanth
sion.timeout.ms, heartbeat.interval.ms, or related > configs? > > On Wed, Oct 19, 2016 at 12:22 PM, Srikanth <srikanth...@gmail.com> wrote: > > Bringing this thread back as I'm seeing this exception on a production > kafka > > cluster. > > > > I have two Spark streaming

Re: Spark 2.0 with Kafka 0.10 exception

2016-10-19 Thread Srikanth
omething you want > to do with the new kafka consumer. > > > As far as the original issue, are you seeing those polling errors > intermittently, or consistently? From your description, it sounds > like retry is working correctly. > > > On Wed, Sep 7, 2016 at 2:37 PM,

Re: Spark with S3 DirectOutputCommitter

2016-09-12 Thread Srikanth
files, rename from _temporary is just not practical in S3. I guess we have to add another stage with S3Distcp?? Srikanth On Sun, Sep 11, 2016 at 2:34 PM, Steve Loughran <ste...@hortonworks.com> wrote: > > > On 9 Sep 2016, at 21:54, Srikanth <srikanth...@gmail.com> wrote: >

Spark with S3 DirectOutputCommitter

2016-09-09 Thread Srikanth
mapreduce.use.directfileoutputcommitter", "true") //.config("spark.sql.sources.outputCommitterClass", classOf[DirectOutputCommitter].getCanonicalName) .getOrCreate() Srikanth

"Job duration" and "Processing time" don't match

2016-09-08 Thread Srikanth
FORMED", "delimiter" -> "\t", "header" -> "false")) .partitionBy("entityId", "regionId", "eventDate") .save(outputPath) Removing SaveMode.Append really speeds things up and also the mismatch between Job duration and processing time disappears. I'm not able to explain what is causing this though. Srikanth

Re: Spark 2.0 with Kafka 0.10 exception

2016-09-07 Thread Srikanth
Yea, disabling cache was not going to be my permanent solution either. I was going to ask how big an overhead is that? It happens intermittently and each time it happens retry is successful. Srikanth On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger <c...@koeninger.org> wrote: > That's n

Re: Spark 2.0 with Kafka 0.10 exception

2016-09-07 Thread Srikanth
> On Wed, Sep 7, 2016 at 2:02 PM, Srikanth <srikanth...@gmail.com> wrote: > > I had a look at the executor logs and noticed that this exception happens > > only when using the cached consumer. > > Every retry is successful. This is consistent. > > One possibility is tha

Re: Spark 2.0 with Kafka 0.10 exception

2016-09-07 Thread Srikanth
Executor: Finished task 1.1 in stage 138.0 (TID 7854). 1103 bytes result sent to driver On Wed, Aug 24, 2016 at 2:13 PM, Srikanth <srikanth...@gmail.com> wrote: > Thanks Cody. Setting poll timeout helped. > Our network is fine but brokers are not fully provisioned in test cluster. > But

Re: Reset auto.offset.reset in Kafka 0.10 integ

2016-09-07 Thread Srikanth
tart that job back up, and it errors because the last committed > offset is no longer available > - you think that instead of erroring, the job should silently restart > based on the value of auto.offset.reset > > Is that accurate? > > > On Wed, Sep 7, 2016 at 10:44 AM, S

Re: Reset auto.offset.reset in Kafka 0.10 integ

2016-09-07 Thread Srikanth
reset. > > If your retention is so low that retention gets expired in between > when the driver created a batch with a given starting offset, and when > an executor starts to process that batch, you're going to have > problems. > > On Tue, Sep 6, 2016 at 2:30 PM, Srikanth <sr

Re: Reset auto.offset.reset in Kafka 0.10 integ

2016-09-06 Thread Srikanth
ot sufficient. > > On Tue, Sep 6, 2016 at 9:39 AM, Srikanth <srikanth...@gmail.com> wrote: > > You are right. I got confused as its all part of same log when running > from > > IDE. > > I was looking for a good guide to read to understand the this integ. > &g

Reset auto.offset.reset in Kafka 0.10 integ

2016-09-02 Thread Srikanth
github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala#L160 How to force it to restart in this case (fully aware of potential data loss)? Srikanth

Re: Spark 2.0 with Kafka 0.10 exception

2016-08-24 Thread Srikanth
Thanks Cody. Setting poll timeout helped. Our network is fine but brokers are not fully provisioned in test cluster. But there isn't enough load to max out on broker capacity. Curious that kafkacat running on the same node doesn't have any issues. Srikanth On Tue, Aug 23, 2016 at 9:52 PM, Cody

Spark 2.0 with Kafka 0.10 exception

2016-08-23 Thread Srikanth
imes a few batches are scheduled and run fine. Then I get this error. kafkacat is able to fetch from this topic continuously. Full exception is here -- https://gist.github.com/SrikanthTati/c2e95c4ac689cd49aab817e24ec42767 Srikanth

Re: Rebalancing when adding kafka partitions

2016-08-16 Thread Srikanth
tern pattern to subscribe to > * @param kafkaParams Kafka Who does the new partition discover? Underlying kafka consumer or spark-streaming-kafka-0-10-assembly?? Srikanth On Fri, Aug 12, 2016 at 5:15 PM, Cody Koeninger <c...@koeninger.org> wrote: > Hrrm, that's interesting. Di

Re: Rebalancing when adding kafka partitions

2016-08-12 Thread Srikanth
partitions.") Should I be setting some parameter/config? Is the doc for new integ available? Thanks, Srikanth On Fri, Jul 22, 2016 at 2:15 PM, Cody Koeninger <c...@koeninger.org> wrote: > No, restarting from a checkpoint won't do it, you need to re-define the > stream. &

Re: Rebalancing when adding kafka partitions

2016-07-22 Thread Srikanth
Yeah, that's what I thought. We need to redefine not just restart. Thanks for the info! I do see the usage of subscribe[K,V] in your DStreams example. Looks simple but its not very obvious how it works :-) I'll watch out for the docs and ScalaDoc. Srikanth On Fri, Jul 22, 2016 at 2:15 PM, Cody

Re: Rebalancing when adding kafka partitions

2016-07-22 Thread Srikanth
subscribepattern, it should pick up new partitions as they are > added. > > On Fri, Jul 22, 2016 at 11:29 AM, Srikanth <srikanth...@gmail.com> wrote: > > Hello, > > > > I'd like to understand how Spark Streaming(direct) would handle Kafka > > partition additio

Rebalancing when adding kafka partitions

2016-07-22 Thread Srikanth
Hello, I'd like to understand how Spark Streaming(direct) would handle Kafka partition addition? Will a running job be aware of new partitions and read from it? Since it uses Kafka APIs to query offsets and offsets are handled internally. Srikanth

Re: Streaming with broadcast joins

2016-02-20 Thread Srikanth
Sabastian, *Update:-* This is not possible. Probably will remain this way for the foreseeable future. https://issues.apache.org/jira/browse/SPARK-3863 Srikanth On Fri, Feb 19, 2016 at 10:20 AM, Sebastian Piu <sebastian@gmail.com> wrote: > I don't have the code with me now, an

Re: Streaming with broadcast joins

2016-02-19 Thread Srikanth
tian@gmail.com> > wrote: > > I don't have the code with me now, and I ended moving everything to RDD in > the end and using map operations to do some lookups, i.e. instead of > broadcasting a Dataframe I ended broadcasting a Map > > > On Fri, Feb 19, 2016 at 11:39 AM Sr

Re: Streaming with broadcast joins

2016-02-19 Thread Srikanth
Hmmm..OK. Srikanth On Fri, Feb 19, 2016 at 10:20 AM, Sebastian Piu <sebastian@gmail.com> wrote: > I don't have the code with me now, and I ended moving everything to RDD in > the end and using map operations to do some lookups, i.e. instead of > broadcasting a Dataframe I end

Re: listening to recursive folder structures in s3 using pyspark streaming (textFileStream)

2016-02-19 Thread Srikanth
Apparently you can pass comma separated folders. Try the suggestion given here --> http://stackoverflow.com/questions/29426246/spark-streaming-textfilestream-not-supporting-wildcards Let me know if this helps Srikanth On Wed, Feb 17, 2016 at 5:47 PM, Shixiong(Ryan) Zhu <shixi...@databric

Re: Streaming with broadcast joins

2016-02-19 Thread Srikanth
an? did it fail? it didnt broadcast? > > On Thu, Feb 18, 2016 at 11:43 PM Srikanth <srikanth...@gmail.com> wrote: > >> Code with SQL broadcast hint. This worked and I was able to see that >> broadcastjoin was performed. >> >> val testDF = sqlContext.read.format(&q

Re: Streaming with broadcast joins

2016-02-18 Thread Srikanth
utput/streaming/"+timestamp) } On Thu, Feb 18, 2016 at 12:55 PM, Sebastian Piu <sebastian@gmail.com> wrote: > Can you paste the code where you use sc.broadcast ? > > On Thu, Feb 18, 2016 at 5:32 PM Srikanth <srikanth...@gmail.com> wrote: > >> Sebastian

Re: Streaming with broadcast joins

2016-02-18 Thread Srikanth
and that did not provide this behavior. Srikanth On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu <sebastian@gmail.com> wrote: > You should be able to broadcast that data frame using sc.broadcast and > join against it. > > On Wed, 17 Feb 2016, 21:13 Srikanth <srikanth...@gmail.c

Streaming with broadcast joins

2016-02-17 Thread Srikanth
Hello, I have a streaming use case where I plan to keep a dataset broadcasted and cached on each executor. Every micro batch in streaming will create a DF out of the RDD and join the batch. The below code will perform the broadcast operation for each RDD. Is there a way to broadcast it just once?

spark-csv partitionBy

2016-02-09 Thread Srikanth
icks.spark.csv").save(s"hdfs:///output/id=$id/") }) This approach doesn't scale well. Especially since no.of unique IDs can be between 500-700. And adding a second partition column will make this even worst. Wondering if anyone has an efficient work around? Srikanth

Re: Broadcast join on multiple dataframes

2016-02-04 Thread Srikanth
Hello, Any pointers on what is causing the optimizer to convert broadcast to shuffle join? This join is with a file that is just 4kb in size. Complete plan --> https://www.dropbox.com/s/apuomw1dg0t1jtc/plan_with_select.txt?dl=0 DAG from UI -->

Re: Broadcast join on multiple dataframes

2016-01-29 Thread Srikanth
Micheal, Output of DF.queryExecution is saved to https://www.dropbox.com/s/1vizuwpswza1e3x/plan.txt?dl=0 I don't see anything in this to suggest a switch in strategy. Hopefully you find this helpful. Srikanth On Thu, Jan 28, 2016 at 4:43 PM, Michael Armbrust <mich...@databricks.com>

Broadcast join on multiple dataframes

2016-01-28 Thread Srikanth
lse,,, : : : +- Scan JSONRelation[creative_id#131L,creative_name#132,concept_id#129L,concept_name#130] InputPaths: file:/shared/data/t1_meta/t1_meta_creative.jsonl : : +- Scan JSONRelation[description#142,id#143L,name#144] InputPaths: file:/shared/data/t1_meta/t1_meta_exchange.jsonl : +- ConvertToUnsafe : +- Scan CsvRelation(,Some(file:///shared/data/t1_meta/technology_key.txt),false, +- ConvertToUnsafe +- Scan CsvRelation(,Some(file:///shared/data/t1_meta/browser_languages.osv),false Srikanth

Adding additional jars to distributed cache (yarn-client)

2015-09-07 Thread Srikanth Sundarrajan
just add the SPARK_JAR and APP_JAR. Am wondering what is the best way to add additional files to Distributed cache and also have them appear in the classpath for ExecutorLauncher. Thanks Srikanth Sundarrajan

Dataframe collect() work but count() fails

2015-08-26 Thread Srikanth
(InMemoryColumnarTableScan.scala:124) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:277) Any idea what is wrong here? Srikanth

Re: Estimate size of Dataframe programatically

2015-08-10 Thread Srikanth
spark.sql.autoBroadcastJoinThreshold estimates size of dataframe. Is it going to broadcast when columnar storage size is less that 10 MB? Srikanth On Fri, Aug 7, 2015 at 2:51 PM, Ted Yu yuzhih...@gmail.com wrote: Have you tried calling SizeEstimator.estimate() on a DataFrame ? I did the following

Estimate size of Dataframe programatically

2015-08-07 Thread Srikanth
. With dataframes, its tricky due to columnar storage. How do we do it? On a related note, I see size of RDD object to be ~60MB. Is that the footprint of RDD in driver JVM? scala val temp = sc.parallelize(Array(1,2,3,4,5,6)) scala SizeEstimator.estimate(temp) res13: Long = 69507320 Srikanth

How does DataFrame except work?

2015-08-03 Thread Srikanth
if second dataframe is small enough? Srikanth

spark-csv number of partitions

2015-07-28 Thread Srikanth
Hello, I'm using spark-csv instead of sc.textfile() to work with CSV files. How can I set no.of partitions that will be created when reading a CSV? Basically an equivalent for minPartitions in textFile() val myrdd = sc.textFile(my.csv,24) Srikanth

spark.deploy.spreadOut core allocation

2015-07-22 Thread Srikanth
. Is this a bug? This is with Spark 1.4. [image: Inline image 1] Srikanth

Re: spark.deploy.spreadOut core allocation

2015-07-22 Thread Srikanth
Cool. Thanks! Srikanth On Wed, Jul 22, 2015 at 3:12 PM, Andrew Or and...@databricks.com wrote: Hi Srikanth, I was able to reproduce the issue by setting `spark.cores.max` to a number greater than the number of cores on a worker. I've filed SPARK-9260 which I believe is already being fixed

ShuffledHashJoin instead of CartesianProduct

2015-07-22 Thread Srikanth
) Is there a more elegant way to do this? On a related note, has anyone worked on record linkage using Bloom Filters, Levenshtein distance, etc in Spark? Srikanth

Re: RowId unique key for Dataframes

2015-07-21 Thread Srikanth
Will work. Thanks! zipWithUniqueId() doesn't guarantee continuous ID either. Srikanth On Tue, Jul 21, 2015 at 9:48 PM, Burak Yavuz brk...@gmail.com wrote: Would monotonicallyIncreasingId https://github.com/apache/spark/blob/d4c7a7a3642a74ad40093c96c4bf45a62a470605/sql/core/src/main/scala/org

Re: HiveThriftServer2.startWithContext error with registerTempTable

2015-07-16 Thread Srikanth
Method) Srikanth On Thu, Jul 16, 2015 at 12:44 AM, Cheng, Hao hao.ch...@intel.com wrote: Have you ever try query the “select * from temp_table” from the spark shell? Or can you try the option --jars while starting the spark shell? *From:* Srikanth [mailto:srikanth...@gmail.com] *Sent

Re: HiveThriftServer2.startWithContext error with registerTempTable

2015-07-15 Thread Srikanth
Hello, Re-sending this to see if I'm second time lucky! I've not managed to move past this error. Srikanth On Mon, Jul 13, 2015 at 9:14 PM, Srikanth srikanth...@gmail.com wrote: Hello, I want to expose result of Spark computation to external tools. I plan to do this with Thrift server JDBC

cache() VS cacheTable()

2015-07-13 Thread Srikanth
cache. I thought this difference is due to columnar format used by dataframe. As per the statement in the book, cache size should be similar. Srikanth

HiveThriftServer2.startWithContext error with registerTempTable

2015-07-13 Thread Srikanth
the other table(my_table) from Beeline though. Any suggestions on how to overcome this? This is with Spark 1.4 pre-built version. Spark-shell was started with --package to pass spark-csv. Srikanth

Re: How do we control output part files created by Spark job?

2015-07-11 Thread Srikanth
spark.sql.shuffle.partitions=10 I think reducing shuffle partitions will slower my group by query of hiveContext or it wont slow it down please guide. On Sat, Jul 11, 2015 at 7:41 AM, Srikanth srikanth...@gmail.com wrote: Is there a join involved in your sql? Have a look at spark.sql.shuffle.partitions

Re: How do we control output part files created by Spark job?

2015-07-10 Thread Srikanth
Is there a join involved in your sql? Have a look at spark.sql.shuffle.partitions? Srikanth On Wed, Jul 8, 2015 at 1:29 AM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi Srikant thanks for the response. I have the following code: hiveContext.sql(insert into... ).coalesce(6) Above code does

Re: Parallelizing multiple RDD / DataFrame creation in Spark

2015-07-08 Thread Srikanth
Your tableLoad() APIs are not actions. File will be read fully only when an action is performed. If the action is something like table1.join(table2), then I think both files will be read in parallel. Can you try that and look at the execution plan or in 1.4 this is shown in Spark UI. Srikanth

Re: How do we control output part files created by Spark job?

2015-07-07 Thread Srikanth
Did you do yourRdd.coalesce(6).saveAsTextFile() or yourRdd.coalesce(6) yourRdd.saveAsTextFile() ? Srikanth On Tue, Jul 7, 2015 at 12:59 PM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi I tried both approach using df. repartition(6

Re: BroadcastHashJoin when RDD is not cached

2015-07-02 Thread Srikanth
/core/src/main/scala/org/apache/spark/sql/functions.scala#L581 ) On Wed, Jul 1, 2015 at 8:30 AM, Srikanth srikanth...@gmail.com wrote: Hello, I have a straight forward use case of joining a large table with a smaller table. The small table is within the limit I set

BroadcastHashJoin when RDD is not cached

2015-07-01 Thread Srikanth
#23,ip#18,scalaUDF(date#20) AS lowerTime#22] PhysicalRDD [ip#18,emailId#19,date#20], MapPartitionsRDD[12] at rddToDataFrameHolder at DataSourceReader.scala:41 Srikanth

Re: Spark 1.4 RDD to DF fails with toDF()

2015-06-29 Thread Srikanth
My error was related to Scala version. Upon further reading, I realized that it takes some effort to get Spark working with Scala 2.11. I've reverted to using 2.10 and moved past that error. Now I hit the issue you mentioned. Waiting for 1.4.1. Srikanth On Fri, Jun 26, 2015 at 9:10 AM, Roberto

Re: Spark 1.4 RDD to DF fails with toDF()

2015-06-26 Thread Srikanth
26, 2015 at 7:05 AM, stati srikanth...@gmail.com wrote: Hello, When I run a spark job with spark-submit it fails with below exception for code line /*val webLogDF = webLogRec.toDF().select(ip, date, name)*/ I had similar issue running from spark-shell, then realized that I needed

Re: java.lang.OutOfMemoryError: PermGen space

2015-06-24 Thread Srikanth
That worked. Thanks! I wonder what changed in 1.4 to cause this. It wouldn't work with anything less than 256m for a simple piece of code. 1.3.1 used to work with default(64m I think) Srikanth On Wed, Jun 24, 2015 at 12:47 PM, Roberto Coluccio roberto.coluc...@gmail.com wrote: Did you try