Re: Benchmaking col vs row similarities
I will increase memory for the job...that will also fix it right ? On Apr 10, 2015 12:43 PM, Reza Zadeh r...@databricks.com wrote: You should pull in this PR: https://github.com/apache/spark/pull/5364 It should resolve that. It is in master. Best, Reza On Fri, Apr 10, 2015 at 8:32 AM, Debasish Das debasish.da...@gmail.com wrote: Hi, I am benchmarking row vs col similarity flow on 60M x 10M matrices... Details are in this JIRA: https://issues.apache.org/jira/browse/SPARK-4823 For testing I am using Netflix data since the structure is very similar: 50k x 17K near dense similarities.. Items are 17K and so I did not activate threshold in colSimilarities yet (it's at 1e-4) Running Spark on YARN with 20 nodes, 4 cores, 16 gb, shuffle threshold 0.6 I keep getting these from col similarity code from 1.2 branch. Should I use Master ? 15/04/10 11:08:36 WARN BlockManagerMasterActor: Removing BlockManager BlockManagerId(5, tblpmidn36adv-hdp.tdc.vzwcorp.com, 44410) with no recent heart beats: 50315ms exceeds 45000ms 15/04/10 11:09:12 ERROR ContextCleaner: Error cleaning broadcast 1012 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:137) at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:227) at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45) at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66) at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:185) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:147) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:138) at scala.Option.foreach(Option.scala:236) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:138) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1468) at org.apache.spark.ContextCleaner.org $apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:133) at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:65) I knew how to increase the 45 ms to something higher as it is compute heavy job but in YARN, I am not sure how to set that config.. But in any-case that's a warning and should not affect the job... Any idea how to improve the runtime other than increasing threshold to 1e-2 ? I will do that next Was netflix dataset benchmarked for col based similarity flow before ? similarity output from this dataset becomes near dense and so it is interesting for stress testing... Thanks. Deb
Re: Benchmaking col vs row similarities
Depends... The heartbeat you received happens due to GC pressure (probably due to Full GC). If you increase the memory too much, the GC's may be less frequent, but the Full GC's may take longer. Try increasing the following confs: spark.executor.heartbeatInterval spark.core.connection.ack.wait.timeout Best, Burak On Fri, Apr 10, 2015 at 8:52 PM, Debasish Das debasish.da...@gmail.com wrote: I will increase memory for the job...that will also fix it right ? On Apr 10, 2015 12:43 PM, Reza Zadeh r...@databricks.com wrote: You should pull in this PR: https://github.com/apache/spark/pull/5364 It should resolve that. It is in master. Best, Reza On Fri, Apr 10, 2015 at 8:32 AM, Debasish Das debasish.da...@gmail.com wrote: Hi, I am benchmarking row vs col similarity flow on 60M x 10M matrices... Details are in this JIRA: https://issues.apache.org/jira/browse/SPARK-4823 For testing I am using Netflix data since the structure is very similar: 50k x 17K near dense similarities.. Items are 17K and so I did not activate threshold in colSimilarities yet (it's at 1e-4) Running Spark on YARN with 20 nodes, 4 cores, 16 gb, shuffle threshold 0.6 I keep getting these from col similarity code from 1.2 branch. Should I use Master ? 15/04/10 11:08:36 WARN BlockManagerMasterActor: Removing BlockManager BlockManagerId(5, tblpmidn36adv-hdp.tdc.vzwcorp.com, 44410) with no recent heart beats: 50315ms exceeds 45000ms 15/04/10 11:09:12 ERROR ContextCleaner: Error cleaning broadcast 1012 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:137) at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:227) at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45) at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66) at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:185) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:147) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:138) at scala.Option.foreach(Option.scala:236) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:138) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1468) at org.apache.spark.ContextCleaner.org $apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:133) at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:65) I knew how to increase the 45 ms to something higher as it is compute heavy job but in YARN, I am not sure how to set that config.. But in any-case that's a warning and should not affect the job... Any idea how to improve the runtime other than increasing threshold to 1e-2 ? I will do that next Was netflix dataset benchmarked for col based similarity flow before ? similarity output from this dataset becomes near dense and so it is interesting for stress testing... Thanks. Deb
Re: Benchmaking col vs row similarities
You should pull in this PR: https://github.com/apache/spark/pull/5364 It should resolve that. It is in master. Best, Reza On Fri, Apr 10, 2015 at 8:32 AM, Debasish Das debasish.da...@gmail.com wrote: Hi, I am benchmarking row vs col similarity flow on 60M x 10M matrices... Details are in this JIRA: https://issues.apache.org/jira/browse/SPARK-4823 For testing I am using Netflix data since the structure is very similar: 50k x 17K near dense similarities.. Items are 17K and so I did not activate threshold in colSimilarities yet (it's at 1e-4) Running Spark on YARN with 20 nodes, 4 cores, 16 gb, shuffle threshold 0.6 I keep getting these from col similarity code from 1.2 branch. Should I use Master ? 15/04/10 11:08:36 WARN BlockManagerMasterActor: Removing BlockManager BlockManagerId(5, tblpmidn36adv-hdp.tdc.vzwcorp.com, 44410) with no recent heart beats: 50315ms exceeds 45000ms 15/04/10 11:09:12 ERROR ContextCleaner: Error cleaning broadcast 1012 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:137) at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:227) at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45) at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66) at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:185) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:147) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:138) at scala.Option.foreach(Option.scala:236) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:138) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1468) at org.apache.spark.ContextCleaner.org $apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:133) at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:65) I knew how to increase the 45 ms to something higher as it is compute heavy job but in YARN, I am not sure how to set that config.. But in any-case that's a warning and should not affect the job... Any idea how to improve the runtime other than increasing threshold to 1e-2 ? I will do that next Was netflix dataset benchmarked for col based similarity flow before ? similarity output from this dataset becomes near dense and so it is interesting for stress testing... Thanks. Deb
Getting outofmemory errors on spark
Hi, I'm reading data stored in S3 and aggregating and storing it in Cassandra using a spark job. When I run the job with approx 3Mil records (about 3-4 GB of data) stored in text files, I get the following error: (11529/14925)15/04/10 19:32:43 INFO TaskSetManager: Starting task 11609.0 in stage 4.0 (TID 56384, spark-slaves-test-cluster-k0b6.c.silver-argon-837.internal, PROCESS_LOCAL, 134 System information as of Fri Apr 10 19:08:57 UTC 201515/04/10 19:32:58 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down ActorSystem [sparkDriv System load: 0.07 Processes: 155 Usage of /: 48.3% of 9.81GB Users logged in: 015/04/10 19:32:58 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down ActorSystem [sparkDriver] *java.lang.OutOfMemoryError: GC overhead limit exceeded at* java.util.Arrays.copyOf(Arrays.java:2367) at java.lang. AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130) at java.lang.AbstractStringBuilder.ensureCapacityInternal( AbstractStringBuilder.java:114) at java.lang.AbstractStringBuilder.append( AbstractStringBuilder.java:535) at java.lang.StringBuilder.append(StringBuilder.java:204) at java.io.ObjectInputStream$BlockDataInputStream. readUTFSpan(ObjectInputStream.java:3143) at java.io.ObjectInputStream$ BlockDataInputStream.readUTFBody(ObjectInputStream.java:3051) at java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2864) at java.io.ObjectInputStream.readUTF(ObjectInputStream.java:1072) at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:671) at java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136) at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104) at scala.util.Try$.apply(Try.scala:161) at akka.serialization. Serialization.deserialize(Serialization.scala:98) at akka.remote.serialization.MessageContainerSerializer.fromBinary( MessageContainerSerializer.scala:63) at akka.serialization. Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104) at scala.util.Try$.apply(Try.scala:161) at akka.serialization. Serialization.deserialize(Serialization.scala:98) at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23) at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:58) at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:58) at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:76) at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:937) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) This error occurs in the final step of my script, when i'm storing the processed records in Cassandra. My memory-per-node is 10GB which means that *all my records should fit on one machine.* The script is in pyspark and I'm using a cluster with: - *Workers:* 5 - *Cores:* 80 Total, 80 Used - *Memory:* 506.5 GB Total, 40.0 GB Used Here is the relevant part of the code, for reference : def connectAndSave(partition): cluster = Cluster(['10.240.1.17']) dbsession = cluster.connect(load_test) ret = map(lambda x : saveUserData(x,dbsession),partition) dbsession.shutdown() cluster.shutdown() res = sessionsRdd.foreachPartition(lambda partition : connectAndSave( partition))
The $ notation for DataFrame Column
Hello, The DataFrame documentation always uses $columnX to annotates a column. But I cannot find much information about it. Maybe I have missed something. Can anyone point me to the doc about the $, if there is any? Thanks. Justin
Re: ClassCastException when calling updateStateKey
Hi Marcelo, I am not including Spark's classes. When I used the userClasspathFirst flag, I started getting those errors. Been there, done that. Removing guava classes was one of the first things I tried. I saw your replies to a similar problem from Sept. http://apache-spark-developers-list.1001551.n3.nabble.com/guava-version-conflicts-td8480.html It looks like my issue is the same cause, but different symptoms. Thanks, Pradeep. On Fri, Apr 10, 2015 at 12:51 PM, Marcelo Vanzin van...@cloudera.com wrote: On Fri, Apr 10, 2015 at 10:11 AM, Pradeep Rai prai...@gmail.com wrote: I tried the userClasspathFirst flag. I started getting NoClassDeFound Exception for spark classes like Function2, etc. Wait. Are you including Spark classes in your app's assembly? Don't do that... As for Guava, yeah, the mess around Optional and friends is unfortunate. One way you could try to work around it, if excluding Spark classes and the userClassPathFirst option doesn't work, is to explicitly remove the Optional (and related) classes from your app's fat jar, and cross your fingers. -- Marcelo
How to use the --files arg
Hi, Suppose I have a command and I pass the --files arg as below: bin/spark-submit --class com.test.HelloWorld --master yarn-cluster --num-executors 8 --driver-memory 512m --executor-memory 2048m --executor-cores 4 --queue public * --files $HOME/myfile.txt* --name test_1 ~/test_code-1.0-SNAPSHOT.jar Can anyone tell me how do I access this file in my executors? Basically I want to read this file to get some configs. I tries to read from my HDFS Home dir but that doesnt work. Thanks, Udit
RE: Is the disk space in SPARK_LOCAL_DIRS cleanned up?
Does anybody have an answer for this? Thanks Ningjun From: Wang, Ningjun (LNG-NPV) Sent: Thursday, April 02, 2015 12:14 PM To: user@spark.apache.org Subject: Is the disk space in SPARK_LOCAL_DIRS cleanned up? I set SPARK_LOCAL_DIRS to C:\temp\spark-temp. When RDDs are shuffled, spark writes to this folder. I found that the disk space of this folder keep on increase quickly and at certain point I will run out of disk space. I wonder does spark clean up the disk space in this folder once the shuffle operation is done? If not, I need to write a job to clean it up myself. But how do I know which sub folders there can be removed? Ningjun
Re: coalesce(*, false) problem
Coalesce tries to reduce the number of partitions into smaller number of partitions, without moving the data around (as much as possible). Since most of received data is in a few machines (those running receivers), coallesce just makes bigger merged partitions in those. Without coalesce Machine 1: 10 partitions processing in parallel Machine 2: 2 partitions processing in parallel With coalesce Machine 1: 10 partitions merged into 1 partition processed together taking 10 times longer Machine 2: 2 partitions merged into 1 partition process together taking 2 times longer Hope this clarifies. TD On Fri, Apr 10, 2015 at 5:16 AM, 邓刚[技术中心] triones.d...@vipshop.com wrote: Hi All, We are running a spark streaming application. The data source is kafka, the data partition of kafka is not well-distributed http://dict.cn/well-distributed but every receiver on every executor can receive data, just different of the amount. and our data is very large so we try to a local repartition with coalesce(*.false). but we found an odd appearances。 Most of the task running on one executor. See picture one. When we remove the coalesce call the task can distributed http://dict.cn/well-distributed better see picture two. Any one knows why? *Picture one* *Picture two* 本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作! This communication is intended only for the addressee(s) and may contain information that is privileged and confidential. You are hereby notified that, if you are not an intended recipient listed above, or an authorized employee or agent of an addressee of this communication responsible for delivering e-mail messages to an intended recipient, any dissemination, distribution or reproduction of this communication (including any attachments hereto) is strictly prohibited. If you have received this communication in error, please notify us immediately by a reply e-mail addressed to the sender and permanently delete the original e-mail communication and any attachments from all storage devices without making or otherwise retaining a copy.
Re: Streaming anomaly detection using ARIMA
Sean, I do agree about the inside out parallelization but my curiosity is mostly in what type of performance I can expect to have by piping out to R. I'm playing with Twitter's new Anomaly Detection library btw, this could be a solution if I can get the calls to R to stand up to the massive dataset that I have. I'll report back my findings. On Thu, Apr 2, 2015 at 3:46 AM, Sean Owen so...@cloudera.com wrote: This inside out parallelization has been a way people have used R with MapReduce for a long time. Run N copies of an R script on the cluster, on different subsets of the data, babysat by Mappers. You just need R installed on the cluster. Hadoop Streaming makes this easy and things like RDD.pipe in Spark make it easier. So it may be just that simple and so there's not much to say about it. I haven't tried this with Spark Streaming but imagine it would also work. Have you tried this? Within a window you would probably take the first x% as training and the rest as test. I don't think there's a question of looking across windows. On Thu, Apr 2, 2015 at 12:31 AM, Corey Nolet cjno...@gmail.com wrote: Surprised I haven't gotten any responses about this. Has anyone tried using rJava or FastR w/ Spark? I've seen the SparkR project but thta goes the other way- what I'd like to do is use R for model calculation and Spark to distribute the load across the cluster. Also, has anyone used Scalation for ARIMA models? On Mon, Mar 30, 2015 at 9:30 AM, Corey Nolet cjno...@gmail.com wrote: Taking out the complexity of the ARIMA models to simplify things- I can't seem to find a good way to represent even standard moving averages in spark streaming. Perhaps it's my ignorance with the micro-batched style of the DStreams API. On Fri, Mar 27, 2015 at 9:13 PM, Corey Nolet cjno...@gmail.com wrote: I want to use ARIMA for a predictive model so that I can take time series data (metrics) and perform a light anomaly detection. The time series data is going to be bucketed to different time units (several minutes within several hours, several hours within several days, several days within several years. I want to do the algorithm in Spark Streaming. I'm used to tuple at a time streaming and I'm having a tad bit of trouble gaining insight into how exactly the windows are managed inside of DStreams. Let's say I have a simple dataset that is marked by a key/value tuple where the key is the name of the component who's metrics I want to run the algorithm against and the value is a metric (a value representing a sum for the time bucket. I want to create histograms of the time series data for each key in the windows in which they reside so I can use that histogram vector to generate my ARIMA prediction (actually, it seems like this doesn't just apply to ARIMA but could apply to any sliding average). I *think* my prediction code may look something like this: val predictionAverages = dstream .groupByKeyAndWindow(60*60*24, 60*60*24) .mapValues(applyARIMAFunction) That is, keep 24 hours worth of metrics in each window and use that for the ARIMA prediction. The part I'm struggling with is how to join together the actual values so that i can do my comparison against the prediction model. Let's say dstream contains the actual values. For any time window, I should be able to take a previous set of windows and use model to compare against the current values.
DataFrame column name restriction
Hello, Are there any restriction in the column name? I tried to use ., but sqlContext.sql cannot find the column. I would guess that . is tricky as this affects accessing StructType, but are there any more restriction on column name? scala case class A(a: Int) defined class A scala sqlContext.createDataFrame(Seq(A(10), A(20))).withColumn(b.b, $a + 1) res19: org.apache.spark.sql.DataFrame = [a: int, b.b: int] scala res19.registerTempTable(res19) scala res19.select(a) res24: org.apache.spark.sql.DataFrame = [a: int] scala res19.select(a, b.b) org.apache.spark.sql.AnalysisException: cannot resolve 'b.b' given input columns a, b.b; at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) Thanks. Justin
foreach going in infinite loop
Hi All I am running below code before calling foreach i did 3 transformation using MapTopair. In my application there are 16 executed but no executed running anything. rddWithscore.foreach(new VoidFunctionTuple2VendorRecord,MapInteger,Double() { @Override public void call(Tuple2VendorRecord, MapInteger, Double t) throws Exception { EntryInteger, Double maxEntry = null; for(EntryInteger, Double entry : t._2.entrySet()) { if (maxEntry == null || entry.getValue() maxEntry.getValue()) { maxEntry = entry; // updateVendorData(maxEntry.getKey()); } log.info(for vendor :+ t._1.getVendorId()+matched company is+maxEntry.getKey()); } } });