[jira] [Updated] (SPARK-18621) PySQL SQL Types (aka Dataframa Schema) have __repr__() with Scala and not Python representation
[ https://issues.apache.org/jira/browse/SPARK-18621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Romi Kuntsman updated SPARK-18621: -- Description: When using Python's repr() on an object, the expected result is a string that Python can evaluate to construct the object. See: https://docs.python.org/2/library/functions.html#func-repr However, when getting a DataFrame schema in PySpark, the code (in "__repr()__" overload methods) returns the string representation for Scala, rather than for Python. Relevant code in PySpark: https://github.com/apache/spark/blob/5f02d2e5b4d37f554629cbd0e488e856fffd7b6b/python/pyspark/sql/types.py#L442 Python Code: {code} # 1. define object struct1 = StructType([StructField("f1", StringType(), True)]) # 2. print representation, expected to be like above print(repr(struct1)) # 3. actual result: # StructType(List(StructField(f1,StringType,true))) # 4. try to use result in code struct2 = StructType(List(StructField(f1,StringType,true))) # 5. get bunch of errors # Unresolved reference 'List' # Unresolved reference 'f1' # StringType is class, not constructed object # Unresolved reference 'true' {code} was: When using Python's repr() on an object, the expected result is a string that Python can evaluate to construct the object. See: https://docs.python.org/2/library/functions.html#func-repr However, when getting a DataFrame schema in PySpark, the code (in "__repr()__" overload methods) returns the string representation for Scala, rather than for Python. Relevant code in PySpark: https://github.com/apache/spark/blob/5f02d2e5b4d37f554629cbd0e488e856fffd7b6b/python/pyspark/sql/types.py#L442 Python Code: # 1. define object struct1 = StructType([StructField("f1", StringType(), True)]) # 2. print representation, expected to be like above print(repr(struct1)) # 3. actual result: # StructType(List(StructField(f1,StringType,true))) # 4. try to use result in code struct2 = StructType(List(StructField(f1,StringType,true))) # 5. get bunch of errors # Unresolved reference 'List' # Unresolved reference 'f1' # StringType is class, not constructed object # Unresolved reference 'true' > PySQL SQL Types (aka Dataframa Schema) have __repr__() with Scala and not > Python representation > --- > > Key: SPARK-18621 > URL: https://issues.apache.org/jira/browse/SPARK-18621 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 1.6.2, 2.0.2 >Reporter: Romi Kuntsman >Priority: Minor > > When using Python's repr() on an object, the expected result is a string that > Python can evaluate to construct the object. > See: https://docs.python.org/2/library/functions.html#func-repr > However, when getting a DataFrame schema in PySpark, the code (in > "__repr()__" overload methods) returns the string representation for Scala, > rather than for Python. > Relevant code in PySpark: > https://github.com/apache/spark/blob/5f02d2e5b4d37f554629cbd0e488e856fffd7b6b/python/pyspark/sql/types.py#L442 > Python Code: > {code} > # 1. define object > struct1 = StructType([StructField("f1", StringType(), True)]) > # 2. print representation, expected to be like above > print(repr(struct1)) > # 3. actual result: > # StructType(List(StructField(f1,StringType,true))) > # 4. try to use result in code > struct2 = StructType(List(StructField(f1,StringType,true))) > # 5. get bunch of errors > # Unresolved reference 'List' > # Unresolved reference 'f1' > # StringType is class, not constructed object > # Unresolved reference 'true' > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18621) PySQL SQL Types (aka Dataframa Schema) have __repr__() with Scala and not Python representation
Romi Kuntsman created SPARK-18621: - Summary: PySQL SQL Types (aka Dataframa Schema) have __repr__() with Scala and not Python representation Key: SPARK-18621 URL: https://issues.apache.org/jira/browse/SPARK-18621 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 2.0.2, 1.6.2 Reporter: Romi Kuntsman Priority: Minor When using Python's repr() on an object, the expected result is a string that Python can evaluate to construct the object. See: https://docs.python.org/2/library/functions.html#func-repr However, when getting a DataFrame schema in PySpark, the code (in "__repr()__" overload methods) returns the string representation for Scala, rather than for Python. Relevant code in PySpark: https://github.com/apache/spark/blob/5f02d2e5b4d37f554629cbd0e488e856fffd7b6b/python/pyspark/sql/types.py#L442 Python Code: # 1. define object struct1 = StructType([StructField("f1", StringType(), True)]) # 2. print representation, expected to be like above print(repr(struct1)) # 3. actual result: # StructType(List(StructField(f1,StringType,true))) # 4. try to use result in code struct2 = StructType(List(StructField(f1,StringType,true))) # 5. get bunch of errors # Unresolved reference 'List' # Unresolved reference 'f1' # StringType is class, not constructed object # Unresolved reference 'true' -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
Israel Spark Meetup
Hello, Please add a link in Spark Community page ( https://spark.apache.org/community.html) To Israel Spark Meetup (https://www.meetup.com/israel-spark-users/) We're an active meetup group, unifying the local Spark user community, and having regular meetups. Thanks! Romi K.
Re: SparkSession replace SQLContext
You can also claim that there's a whole section of "Migrating from 1.6 to 2.0" missing there: https://spark.apache.org/docs/2.0.0-preview/sql-programming-guide.html#migration-guide *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Tue, Jul 5, 2016 at 12:24 PM, nihed mbarek <nihe...@gmail.com> wrote: > Hi, > > I just discover that that SparkSession will replace SQLContext for spark > 2.0 > JavaDoc is clear > https://spark.apache.org/docs/2.0.0-preview/api/java/org/apache/spark/sql/SparkSession.html > but there is no mention in sql programming guide > > https://spark.apache.org/docs/2.0.0-preview/sql-programming-guide.html#starting-point-sqlcontext > > Is it possible to update documentation before the release ? > > > Thank you > > -- > > MBAREK Med Nihed, > Fedora Ambassador, TUNISIA, Northern Africa > http://www.nihed.com > > <http://tn.linkedin.com/in/nihed> > >
[jira] [Commented] (SPARK-4452) Shuffle data structures can starve others on the same thread for memory
[ https://issues.apache.org/jira/browse/SPARK-4452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15255658#comment-15255658 ] Romi Kuntsman commented on SPARK-4452: -- Hi, what's the reason this will only be available in Spark 2.0.0, and not 1.6.4 or 1.7.0? > Shuffle data structures can starve others on the same thread for memory > > > Key: SPARK-4452 > URL: https://issues.apache.org/jira/browse/SPARK-4452 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.1.0 >Reporter: Tianshuo Deng >Assignee: Tianshuo Deng > Fix For: 2.0.0 > > > When an Aggregator is used with ExternalSorter in a task, spark will create > many small files and could cause too many files open error during merging. > Currently, ShuffleMemoryManager does not work well when there are 2 spillable > objects in a thread, which are ExternalSorter and ExternalAppendOnlyMap(used > by Aggregator) in this case. Here is an example: Due to the usage of mapside > aggregation, ExternalAppendOnlyMap is created first to read the RDD. It may > ask as much memory as it can, which is totalMem/numberOfThreads. Then later > on when ExternalSorter is created in the same thread, the > ShuffleMemoryManager could refuse to allocate more memory to it, since the > memory is already given to the previous requested > object(ExternalAppendOnlyMap). That causes the ExternalSorter keeps spilling > small files(due to the lack of memory) > I'm currently working on a PR to address these two issues. It will include > following changes: > 1. The ShuffleMemoryManager should not only track the memory usage for each > thread, but also the object who holds the memory > 2. The ShuffleMemoryManager should be able to trigger the spilling of a > spillable object. In this way, if a new object in a thread is requesting > memory, the old occupant could be evicted/spilled. Previously the spillable > objects trigger spilling by themselves. So one may not trigger spilling even > if another object in the same thread needs more memory. After this change The > ShuffleMemoryManager could trigger the spilling of an object if it needs to. > 3. Make the iterator of ExternalAppendOnlyMap spillable. Previously > ExternalAppendOnlyMap returns an destructive iterator and can not be spilled > after the iterator is returned. This should be changed so that even after the > iterator is returned, the ShuffleMemoryManager can still spill it. > Currently, I have a working branch in progress: > https://github.com/tsdeng/spark/tree/enhance_memory_manager. Already made > change 3 and have a prototype of change 1 and 2 to evict spillable from > memory manager, still in progress. I will send a PR when it's done. > Any feedback or thoughts on this change is highly appreciated ! -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
Re: [discuss] ending support for Java 7 in Spark 2.0
+1 for Java 8 only I think it will make it easier to make a unified API for Java and Scala, instead of the wrappers of Java over Scala. On Mar 24, 2016 11:46 AM, "Stephen Boesch"wrote: > +1 for java8 only +1 for 2.11+ only .At this point scala libraries > supporting only 2.10 are typically less active and/or poorly maintained. > That trend will only continue when considering the lifespan of spark 2.X. > > 2016-03-24 11:32 GMT-07:00 Steve Loughran : > >> >> On 24 Mar 2016, at 15:27, Koert Kuipers wrote: >> >> i think the arguments are convincing, but it also makes me wonder if i >> live in some kind of alternate universe... we deploy on customers clusters, >> where the OS, python version, java version and hadoop distro are not chosen >> by us. so think centos 6, cdh5 or hdp 2.3, java 7 and python 2.6. we simply >> have access to a single proxy machine and launch through yarn. asking them >> to upgrade java is pretty much out of the question or a 6+ month ordeal. of >> the 10 client clusters i can think of on the top of my head all of them are >> on java 7, none are on java 8. so by doing this you would make spark 2 >> basically unusable for us (unless most of them have plans of upgrading in >> near term to java 8, i will ask around and report back...). >> >> >> >> It's not actually mandatory for the process executing in the Yarn cluster >> to run with the same JVM as the rest of the Hadoop stack; all that is >> needed is for the environment variables to set up the JAVA_HOME and PATH. >> Switching JVMs not something which YARN makes it easy to do, but it may be >> possible, especially if Spark itself provides some hooks, so you don't have >> to manually lay with setting things up. That may be something which could >> significantly ease adoption of Spark 2 in YARN clusters. Same for Python. >> >> This is something I could probably help others to address >> >> >
Re: Spark 1.6.1
Sounds fair. Is it to avoid cluttering maven central with too many intermediate versions? What do I need to add in my pom.xml section to make it work? *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Tue, Feb 23, 2016 at 9:34 AM, Reynold Xin <r...@databricks.com> wrote: > We usually publish to a staging maven repo hosted by the ASF (not maven > central). > > > > On Mon, Feb 22, 2016 at 11:32 PM, Romi Kuntsman <r...@totango.com> wrote: > >> Is it possible to make RC versions available via Maven? (many projects do >> that) >> That will make integration much easier, so many more people can test the >> version before the final release. >> Thanks! >> >> *Romi Kuntsman*, *Big Data Engineer* >> http://www.totango.com >> >> On Tue, Feb 23, 2016 at 8:07 AM, Luciano Resende <luckbr1...@gmail.com> >> wrote: >> >>> >>> >>> On Mon, Feb 22, 2016 at 9:08 PM, Michael Armbrust < >>> mich...@databricks.com> wrote: >>> >>>> An update: people.apache.org has been shut down so the release scripts >>>> are broken. Will try again after we fix them. >>>> >>>> >>> If you skip uploading to people.a.o, it should still be available in >>> nexus for review. >>> >>> The other option is to add the RC into >>> https://dist.apache.org/repos/dist/dev/ >>> >>> >>> >>> -- >>> Luciano Resende >>> http://people.apache.org/~lresende >>> http://twitter.com/lresende1975 >>> http://lresende.blogspot.com/ >>> >> >> >
Re: Spark 1.6.1
Is it possible to make RC versions available via Maven? (many projects do that) That will make integration much easier, so many more people can test the version before the final release. Thanks! *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Tue, Feb 23, 2016 at 8:07 AM, Luciano Resende <luckbr1...@gmail.com> wrote: > > > On Mon, Feb 22, 2016 at 9:08 PM, Michael Armbrust <mich...@databricks.com> > wrote: > >> An update: people.apache.org has been shut down so the release scripts >> are broken. Will try again after we fix them. >> >> > If you skip uploading to people.a.o, it should still be available in nexus > for review. > > The other option is to add the RC into > https://dist.apache.org/repos/dist/dev/ > > > > -- > Luciano Resende > http://people.apache.org/~lresende > http://twitter.com/lresende1975 > http://lresende.blogspot.com/ >
Re: Spark 1.6.1
Hi Michael, What about the memory leak bug? https://issues.apache.org/jira/browse/SPARK-11293 Even after the memory rewrite in 1.6.0, it still happens in some cases. Will it be fixed for 1.6.1? Thanks, *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Mon, Feb 1, 2016 at 9:59 PM, Michael Armbrust <mich...@databricks.com> wrote: > We typically do not allow changes to the classpath in maintenance releases. > > On Mon, Feb 1, 2016 at 8:16 AM, Hamel Kothari <hamelkoth...@gmail.com> > wrote: > >> I noticed that the Jackson dependency was bumped to 2.5 in master for >> something spark-streaming related. Is there any reason that this upgrade >> can't be included with 1.6.1? >> >> According to later comments on this thread: >> https://issues.apache.org/jira/browse/SPARK-8332 and my personal >> experience using with Spark with Jackson 2.5 hasn't caused any issues but >> it does have some useful new features. It should be fully backwards >> compatible according to the Jackson folks. >> >> On Mon, Feb 1, 2016 at 10:29 AM Ted Yu <yuzhih...@gmail.com> wrote: >> >>> SPARK-12624 has been resolved. >>> According to Wenchen, SPARK-12783 is fixed in 1.6.0 release. >>> >>> Are there other blockers for Spark 1.6.1 ? >>> >>> Thanks >>> >>> On Wed, Jan 13, 2016 at 5:39 PM, Michael Armbrust < >>> mich...@databricks.com> wrote: >>> >>>> Hey All, >>>> >>>> While I'm not aware of any critical issues with 1.6.0, there are >>>> several corner cases that users are hitting with the Dataset API that are >>>> fixed in branch-1.6. As such I'm considering a 1.6.1 release. >>>> >>>> At the moment there are only two critical issues targeted for 1.6.1: >>>> - SPARK-12624 - When schema is specified, we should treat undeclared >>>> fields as null (in Python) >>>> - SPARK-12783 - Dataset map serialization error >>>> >>>> When these are resolved I'll likely begin the release process. If >>>> there are any other issues that we should wait for please contact me. >>>> >>>> Michael >>>> >>> >>> >
[jira] [Commented] (SPARK-11293) Spillable collections leak shuffle memory
[ https://issues.apache.org/jira/browse/SPARK-11293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15098008#comment-15098008 ] Romi Kuntsman commented on SPARK-11293: --- so add 1.6.0 as affected version... > Spillable collections leak shuffle memory > - > > Key: SPARK-11293 > URL: https://issues.apache.org/jira/browse/SPARK-11293 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.3.1, 1.4.1, 1.5.1 >Reporter: Josh Rosen >Assignee: Josh Rosen >Priority: Critical > Fix For: 1.6.0 > > > I discovered multiple leaks of shuffle memory while working on my memory > manager consolidation patch, which added the ability to do strict memory leak > detection for the bookkeeping that used to be performed by the > ShuffleMemoryManager. This uncovered a handful of places where tasks can > acquire execution/shuffle memory but never release it, starving themselves of > memory. > Problems that I found: > * {{ExternalSorter.stop()}} should release the sorter's shuffle/execution > memory. > * BlockStoreShuffleReader should call {{ExternalSorter.stop()}} using a > {{CompletionIterator}}. > * {{ExternalAppendOnlyMap}} exposes no equivalent of {{stop()}} for freeing > its resources. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11293) Spillable collections leak shuffle memory
[ https://issues.apache.org/jira/browse/SPARK-11293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15096375#comment-15096375 ] Romi Kuntsman commented on SPARK-11293: --- so should be reopened or not? is there still a memory leak? is there a new memory leak instead of the old one? > Spillable collections leak shuffle memory > - > > Key: SPARK-11293 > URL: https://issues.apache.org/jira/browse/SPARK-11293 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.3.1, 1.4.1, 1.5.1 >Reporter: Josh Rosen >Assignee: Josh Rosen >Priority: Critical > Fix For: 1.6.0 > > > I discovered multiple leaks of shuffle memory while working on my memory > manager consolidation patch, which added the ability to do strict memory leak > detection for the bookkeeping that used to be performed by the > ShuffleMemoryManager. This uncovered a handful of places where tasks can > acquire execution/shuffle memory but never release it, starving themselves of > memory. > Problems that I found: > * {{ExternalSorter.stop()}} should release the sorter's shuffle/execution > memory. > * BlockStoreShuffleReader should call {{ExternalSorter.stop()}} using a > {{CompletionIterator}}. > * {{ExternalAppendOnlyMap}} exposes no equivalent of {{stop()}} for freeing > its resources. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3665) Java API for GraphX
[ https://issues.apache.org/jira/browse/SPARK-3665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15085452#comment-15085452 ] Romi Kuntsman commented on SPARK-3665: -- So at what version of Spark is it expected to happen? > Java API for GraphX > --- > > Key: SPARK-3665 > URL: https://issues.apache.org/jira/browse/SPARK-3665 > Project: Spark > Issue Type: Improvement > Components: GraphX, Java API >Affects Versions: 1.0.0 >Reporter: Ankur Dave >Assignee: Ankur Dave > > The Java API will wrap the Scala API in a similar manner as JavaRDD. > Components will include: > # JavaGraph > #- removes optional param from persist, subgraph, mapReduceTriplets, > Graph.fromEdgeTuples, Graph.fromEdges, Graph.apply > #- removes implicit {{=:=}} param from mapVertices, outerJoinVertices > #- merges multiple parameters lists > #- incorporates GraphOps > # JavaVertexRDD > # JavaEdgeRDD -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-3665) Java API for GraphX
[ https://issues.apache.org/jira/browse/SPARK-3665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Romi Kuntsman updated SPARK-3665: - Comment: was deleted (was: So at what version of Spark is it expected to happen?) > Java API for GraphX > --- > > Key: SPARK-3665 > URL: https://issues.apache.org/jira/browse/SPARK-3665 > Project: Spark > Issue Type: Improvement > Components: GraphX, Java API >Affects Versions: 1.0.0 >Reporter: Ankur Dave >Assignee: Ankur Dave > > The Java API will wrap the Scala API in a similar manner as JavaRDD. > Components will include: > # JavaGraph > #- removes optional param from persist, subgraph, mapReduceTriplets, > Graph.fromEdgeTuples, Graph.fromEdges, Graph.apply > #- removes implicit {{=:=}} param from mapVertices, outerJoinVertices > #- merges multiple parameters lists > #- incorporates GraphOps > # JavaVertexRDD > # JavaEdgeRDD -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3665) Java API for GraphX
[ https://issues.apache.org/jira/browse/SPARK-3665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15085454#comment-15085454 ] Romi Kuntsman commented on SPARK-3665: -- So at what version of Spark is it expected to happen? > Java API for GraphX > --- > > Key: SPARK-3665 > URL: https://issues.apache.org/jira/browse/SPARK-3665 > Project: Spark > Issue Type: Improvement > Components: GraphX, Java API >Affects Versions: 1.0.0 >Reporter: Ankur Dave >Assignee: Ankur Dave > > The Java API will wrap the Scala API in a similar manner as JavaRDD. > Components will include: > # JavaGraph > #- removes optional param from persist, subgraph, mapReduceTriplets, > Graph.fromEdgeTuples, Graph.fromEdges, Graph.apply > #- removes implicit {{=:=}} param from mapVertices, outerJoinVertices > #- merges multiple parameters lists > #- incorporates GraphOps > # JavaVertexRDD > # JavaEdgeRDD -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
Re: Shuffle FileNotFound Exception
take executor memory times spark.shuffle.memoryFraction and divide the data so that each partition is less than the above *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Wed, Nov 18, 2015 at 2:09 PM, Tom Arnfeld <t...@duedil.com> wrote: > Hi Romi, > > Thanks! Could you give me an indication of how much increase the > partitions by? We’ll take a stab in the dark, the input data is around 5M > records (though each record is fairly small). We’ve had trouble both with > DataFrames and RDDs. > > Tom. > > On 18 Nov 2015, at 12:04, Romi Kuntsman <r...@totango.com> wrote: > > I had many issues with shuffles (but not this one exactly), and what > eventually solved it was to repartition to input into more parts. Have you > tried that? > > P.S. not sure if related, but there's a memory leak in the shuffle > mechanism > https://issues.apache.org/jira/browse/SPARK-11293 > > *Romi Kuntsman*, *Big Data Engineer* > http://www.totango.com > > On Wed, Nov 18, 2015 at 2:00 PM, Tom Arnfeld <t...@duedil.com> wrote: > >> Hey, >> >> I’m wondering if anyone has run into issues with Spark 1.5 and a >> FileNotFound exception with shuffle.index files? It’s been cropping up with >> very large joins and aggregations, and causing all of our jobs to fail >> towards the end. The memory limit for the executors (we’re running on >> mesos) is touching 60GB+ with ~10 cores per executor, which is way >> oversubscribed. >> >> We’re running spark inside containers, and have configured >> “spark.executorEnv.SPARK_LOCAL_DIRS” to a special directory path in the >> container for performance/disk reasons, and since then the issue started to >> arise. I’m wondering if there’s a bug with the way spark looks for shuffle >> files, and one of the implementations isn’t obeying the path properly? >> >> I don’t want to set "spark.local.dir” because that requires the driver >> also have this directory set up, which is not the case. >> >> Has anyone seen this issue before? >> >> >> >> 15/11/18 11:32:27 ERROR storage.ShuffleBlockFetcherIterator: Failed to >> get block(s) from XXX:50777 >> java.lang.RuntimeException: java.io.FileNotFoundException: >> /mnt/mesos/sandbox/spark-tmp/blockmgr-7a5b85c8-7c5f-43c2-b9e2-2bc0ffdb902d/23/shuffle_2_94_0.index >> (No such file or directory) >>at java.io.FileInputStream.open(Native Method) >>at java.io.FileInputStream.(FileInputStream.java:146) >>at >> org.apache.spark.shuffle.IndexShuffleBlockResolver.getBlockData(IndexShuffleBlockResolver.scala:98) >>at >> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:300) >>at >> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >>at >> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >>at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>at >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >>at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >>at >> scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >>at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) >>at >> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) >>at >> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114) >>at >> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87) >>at >> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101) >>at >> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51) >>at >> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >>at >> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) >>at >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) >>at >> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266) >>at >> io.netty.channel.AbstractChannelHandlerContext.invokeChann
[jira] [Commented] (SPARK-11293) Spillable collections leak shuffle memory
[ https://issues.apache.org/jira/browse/SPARK-11293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15008738#comment-15008738 ] Romi Kuntsman commented on SPARK-11293: --- The memory manager was rewritten there? Could it have introduced a memory leak in a different place or of a different kind? Is there a regression test to verify? > Spillable collections leak shuffle memory > - > > Key: SPARK-11293 > URL: https://issues.apache.org/jira/browse/SPARK-11293 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.3.1, 1.4.1, 1.5.1 >Reporter: Josh Rosen >Assignee: Josh Rosen >Priority: Critical > Fix For: 1.6.0 > > > I discovered multiple leaks of shuffle memory while working on my memory > manager consolidation patch, which added the ability to do strict memory leak > detection for the bookkeeping that used to be performed by the > ShuffleMemoryManager. This uncovered a handful of places where tasks can > acquire execution/shuffle memory but never release it, starving themselves of > memory. > Problems that I found: > * {{ExternalSorter.stop()}} should release the sorter's shuffle/execution > memory. > * BlockStoreShuffleReader should call {{ExternalSorter.stop()}} using a > {{CompletionIterator}}. > * {{ExternalAppendOnlyMap}} exposes no equivalent of {{stop()}} for freeing > its resources. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6962) Netty BlockTransferService hangs in the middle of SQL query
[ https://issues.apache.org/jira/browse/SPARK-6962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15005851#comment-15005851 ] Romi Kuntsman commented on SPARK-6962: -- what's the status of this? something similar happens to me in 1.4.0 and also in 1.5.1 the job hangs forever with the largest shuffle when increasing the number of partitions (as a function of the data size), the issue is fixed > Netty BlockTransferService hangs in the middle of SQL query > --- > > Key: SPARK-6962 > URL: https://issues.apache.org/jira/browse/SPARK-6962 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.2.0, 1.2.1, 1.3.0 >Reporter: Jon Chase > Attachments: jstacks.txt > > > Spark SQL queries (though this seems to be a Spark Core issue - I'm just > using queries in the REPL to surface this, so I mention Spark SQL) hang > indefinitely under certain (not totally understood) circumstances. > This is resolved by setting spark.shuffle.blockTransferService=nio, which > seems to point to netty as the issue. Netty was set as the default for the > block transport layer in 1.2.0, which is when this issue started. Setting > the service to nio allows queries to complete normally. > I do not see this problem when running queries over smaller (~20 5MB files) > datasets. When I increase the scope to include more data (several hundred > ~5MB files), the queries will get through several steps but eventuall hang > indefinitely. > Here's the email chain regarding this issue, including stack traces: > http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/<cae61spfqt2y7d5vqzomzz2dmr-jx2c2zggcyky40npkjjx4...@mail.gmail.com> > For context, here's the announcement regarding the block transfer service > change: > http://mail-archives.apache.org/mod_mbox/spark-dev/201411.mbox/<cabpqxssl04q+rbltp-d8w+z3atn+g-um6gmdgdnh-hzcvd-...@mail.gmail.com> -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
Re: Some spark apps fail with "All masters are unresponsive", while others pass normally
If they have a problem managing memory, wouldn't there should be a OOM? Why does AppClient throw a NPE? *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Mon, Nov 9, 2015 at 4:59 PM, Akhil Das <ak...@sigmoidanalytics.com> wrote: > Is that all you have in the executor logs? I suspect some of those jobs > are having a hard time managing the memory. > > Thanks > Best Regards > > On Sun, Nov 1, 2015 at 9:38 PM, Romi Kuntsman <r...@totango.com> wrote: > >> [adding dev list since it's probably a bug, but i'm not sure how to >> reproduce so I can open a bug about it] >> >> Hi, >> >> I have a standalone Spark 1.4.0 cluster with 100s of applications running >> every day. >> >> From time to time, the applications crash with the following error (see >> below) >> But at the same time (and also after that), other applications are >> running, so I can safely assume the master and workers are working. >> >> 1. why is there a NullPointerException? (i can't track the scala stack >> trace to the code, but anyway NPE is usually a obvious bug even if there's >> actually a network error...) >> 2. why can't it connect to the master? (if it's a network timeout, how to >> increase it? i see the values are hardcoded inside AppClient) >> 3. how to recover from this error? >> >> >> ERROR 01-11 15:32:54,991SparkDeploySchedulerBackend - Application >> has been killed. Reason: All masters are unresponsive! Giving up. ERROR >> ERROR 01-11 15:32:55,087 OneForOneStrategy - ERROR >> logs/error.log >> java.lang.NullPointerException NullPointerException >> at >> org.apache.spark.deploy.client.AppClient$ClientActor$$anonfun$receiveWithLogging$1.applyOrElse(AppClient.scala:160) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) >> at >> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59) >> at >> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) >> at >> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) >> at >> org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) >> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) >> at >> org.apache.spark.deploy.client.AppClient$ClientActor.aroundReceive(AppClient.scala:61) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) >> at >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> ERROR 01-11 15:32:55,603 SparkContext - Error >> initializing SparkContext. ERROR >> java.lang.IllegalStateException: Cannot call methods on a stopped >> SparkContext >> at org.apache.spark.SparkContext.org >> $apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:103) >> at >> org.apache.spark.SparkContext.getSchedulingMode(SparkContext.scala:1501) >> at >> org.apache.spark.SparkContext.postEnvironmentUpdate(SparkContext.scala:2005) >> at org.apache.spark.SparkContext.(SparkContext.scala:543) >> at >> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61) >> >> >> Thanks! >> >> *Romi Kuntsman*, *Big Data Engineer* >> http://www.totango.com >> > >
Re: Some spark apps fail with "All masters are unresponsive", while others pass normally
I didn't see anything about a OOM. This happens sometimes before anything in the application happened, and happens to a few applications at the same time - so I guess it's a communication failure, but the problem is that the error shown doesn't represent the actual problem (which may be a network timeout etc) *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Mon, Nov 9, 2015 at 6:00 PM, Akhil Das <ak...@sigmoidanalytics.com> wrote: > Did you find anything regarding the OOM in the executor logs? > > Thanks > Best Regards > > On Mon, Nov 9, 2015 at 8:44 PM, Romi Kuntsman <r...@totango.com> wrote: > >> If they have a problem managing memory, wouldn't there should be a OOM? >> Why does AppClient throw a NPE? >> >> *Romi Kuntsman*, *Big Data Engineer* >> http://www.totango.com >> >> On Mon, Nov 9, 2015 at 4:59 PM, Akhil Das <ak...@sigmoidanalytics.com> >> wrote: >> >>> Is that all you have in the executor logs? I suspect some of those jobs >>> are having a hard time managing the memory. >>> >>> Thanks >>> Best Regards >>> >>> On Sun, Nov 1, 2015 at 9:38 PM, Romi Kuntsman <r...@totango.com> wrote: >>> >>>> [adding dev list since it's probably a bug, but i'm not sure how to >>>> reproduce so I can open a bug about it] >>>> >>>> Hi, >>>> >>>> I have a standalone Spark 1.4.0 cluster with 100s of applications >>>> running every day. >>>> >>>> From time to time, the applications crash with the following error (see >>>> below) >>>> But at the same time (and also after that), other applications are >>>> running, so I can safely assume the master and workers are working. >>>> >>>> 1. why is there a NullPointerException? (i can't track the scala stack >>>> trace to the code, but anyway NPE is usually a obvious bug even if there's >>>> actually a network error...) >>>> 2. why can't it connect to the master? (if it's a network timeout, how >>>> to increase it? i see the values are hardcoded inside AppClient) >>>> 3. how to recover from this error? >>>> >>>> >>>> ERROR 01-11 15:32:54,991SparkDeploySchedulerBackend - Application >>>> has been killed. Reason: All masters are unresponsive! Giving up. ERROR >>>> ERROR 01-11 15:32:55,087 OneForOneStrategy - ERROR >>>> logs/error.log >>>> java.lang.NullPointerException NullPointerException >>>> at >>>> org.apache.spark.deploy.client.AppClient$ClientActor$$anonfun$receiveWithLogging$1.applyOrElse(AppClient.scala:160) >>>> at >>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) >>>> at >>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) >>>> at >>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) >>>> at >>>> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59) >>>> at >>>> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) >>>> at >>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) >>>> at >>>> org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) >>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) >>>> at >>>> org.apache.spark.deploy.client.AppClient$ClientActor.aroundReceive(AppClient.scala:61) >>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>>> at >>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>> ERROR 01-11 15:32:55,603 SparkContext - Error >>>> initializing SparkContext. ERROR >>>> java.lang.IllegalStateException: Cannot call methods on a stopped >>>> SparkContext >>>> at org.apache.spark.SparkContext.org >>>> $apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:103) >>>> at >>>> org.apache.spark.SparkContext.getSchedulingMode(SparkContext.scala:1501) >>>> at >>>> org.apache.spark.SparkContext.postEnvironmentUpdate(SparkContext.scala:2005) >>>> at org.apache.spark.SparkContext.(SparkContext.scala:543) >>>> at >>>> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61) >>>> >>>> >>>> Thanks! >>>> >>>> *Romi Kuntsman*, *Big Data Engineer* >>>> http://www.totango.com >>>> >>> >>> >> >
Re: Some spark apps fail with "All masters are unresponsive", while others pass normally
If they have a problem managing memory, wouldn't there should be a OOM? Why does AppClient throw a NPE? *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Mon, Nov 9, 2015 at 4:59 PM, Akhil Das <ak...@sigmoidanalytics.com> wrote: > Is that all you have in the executor logs? I suspect some of those jobs > are having a hard time managing the memory. > > Thanks > Best Regards > > On Sun, Nov 1, 2015 at 9:38 PM, Romi Kuntsman <r...@totango.com> wrote: > >> [adding dev list since it's probably a bug, but i'm not sure how to >> reproduce so I can open a bug about it] >> >> Hi, >> >> I have a standalone Spark 1.4.0 cluster with 100s of applications running >> every day. >> >> From time to time, the applications crash with the following error (see >> below) >> But at the same time (and also after that), other applications are >> running, so I can safely assume the master and workers are working. >> >> 1. why is there a NullPointerException? (i can't track the scala stack >> trace to the code, but anyway NPE is usually a obvious bug even if there's >> actually a network error...) >> 2. why can't it connect to the master? (if it's a network timeout, how to >> increase it? i see the values are hardcoded inside AppClient) >> 3. how to recover from this error? >> >> >> ERROR 01-11 15:32:54,991SparkDeploySchedulerBackend - Application >> has been killed. Reason: All masters are unresponsive! Giving up. ERROR >> ERROR 01-11 15:32:55,087 OneForOneStrategy - ERROR >> logs/error.log >> java.lang.NullPointerException NullPointerException >> at >> org.apache.spark.deploy.client.AppClient$ClientActor$$anonfun$receiveWithLogging$1.applyOrElse(AppClient.scala:160) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) >> at >> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59) >> at >> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) >> at >> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) >> at >> org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) >> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) >> at >> org.apache.spark.deploy.client.AppClient$ClientActor.aroundReceive(AppClient.scala:61) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) >> at >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> ERROR 01-11 15:32:55,603 SparkContext - Error >> initializing SparkContext. ERROR >> java.lang.IllegalStateException: Cannot call methods on a stopped >> SparkContext >> at org.apache.spark.SparkContext.org >> $apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:103) >> at >> org.apache.spark.SparkContext.getSchedulingMode(SparkContext.scala:1501) >> at >> org.apache.spark.SparkContext.postEnvironmentUpdate(SparkContext.scala:2005) >> at org.apache.spark.SparkContext.(SparkContext.scala:543) >> at >> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61) >> >> >> Thanks! >> >> *Romi Kuntsman*, *Big Data Engineer* >> http://www.totango.com >> > >
[jira] [Commented] (SPARK-3767) Support wildcard in Spark properties
[ https://issues.apache.org/jira/browse/SPARK-3767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14996389#comment-14996389 ] Romi Kuntsman commented on SPARK-3767: -- [~andrewor14] what's going on with this issue? I found a link to it here: http://mail-archives.us.apache.org/mod_mbox/spark-user/201410.mbox/%3ccamjob8kg3_jhh_8ibnx04wya-fi7aeghs+fh1nalmembac7...@mail.gmail.com%3E I want to connect to JMX in Executors, which means each one needs a different port number, is it possible? > Support wildcard in Spark properties > > > Key: SPARK-3767 > URL: https://issues.apache.org/jira/browse/SPARK-3767 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Affects Versions: 1.1.0 >Reporter: Andrew Or > > If the user sets spark.executor.extraJavaOptions, he/she may want to express > the value in terms of the executor ID, for instance. In general it would be a > feature that many will find useful. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
Re: Ready to talk about Spark 2.0?
A major release usually means giving up on some API backward compatibility? Can this be used as a chance to merge efforts with Apache Flink ( https://flink.apache.org/) and create the one ultimate open source big data processing system? Spark currently feels like it was made for interactive use (like Python and R), and when used others (batch/streaming), it feels like scripted interactive instead of really a standalone complete app. Maybe some base concepts may be adapted? (I'm not currently a committer, but as a heavy Spark user I'd love to participate in the discussion of what can/should be in Spark 2.0) *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Fri, Nov 6, 2015 at 2:53 PM, Jean-Baptiste Onofré <j...@nanthrax.net> wrote: > Hi Sean, > > Happy to see this discussion. > > I'm working on PoC to run Camel on Spark Streaming. The purpose is to have > an ingestion and integration platform directly running on Spark Streaming. > > Basically, we would be able to use a Camel Spark DSL like: > > > from("jms:queue:foo").choice().when(predicate).to("job:bar").when(predicate).to("hdfs:path").otherwise("file:path") > > Before a formal proposal (I have to do more work there), I'm just > wondering if such framework can be a new Spark module (Spark Integration > for instance, like Spark ML, Spark Stream, etc). > > Maybe it could be a good candidate for an addition in a "major" release > like Spark 2.0. > > Just my $0.01 ;) > > Regards > JB > > > On 11/06/2015 01:44 PM, Sean Owen wrote: > >> Since branch-1.6 is cut, I was going to make version 1.7.0 in JIRA. >> However I've had a few side conversations recently about Spark 2.0, and >> I know I and others have a number of ideas about it already. >> >> I'll go ahead and make 1.7.0, but thought I'd ask, how much other >> interest is there in starting to plan Spark 2.0? is that even on the >> table as the next release after 1.6? >> >> Sean >> > > -- > Jean-Baptiste Onofré > jbono...@apache.org > http://blog.nanthrax.net > Talend - http://www.talend.com > > - > To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org > For additional commands, e-mail: dev-h...@spark.apache.org > >
Re: Ready to talk about Spark 2.0?
Hi, thanks for the feedback I'll try to explain better what I meant. First we had RDDs, then we had DataFrames, so could the next step be something like stored procedures over DataFrames? So I define the whole calculation flow, even if it includes any "actions" in between, and the whole thing is planned and executed in a super optimized way once I tell it "go!" What I mean by "feels like scripted" is that actions come back to the driver, like they would if you were in front of a command prompt. But often the flow contains many steps with actions in between - multiple levels of aggregations, iterative machine learning algorithms etc. Sending the whole "workplan" to the Spark framework would be, as I see it, the next step of it's evolution, like stored procedures send a logic with many SQL queries to the database. Was it more clear this time? :) *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Sun, Nov 8, 2015 at 5:59 PM, Koert Kuipers <ko...@tresata.com> wrote: > romi, > unless am i misunderstanding your suggestion you might be interested in > projects like the new mahout where they try to abstract out the engine with > bindings, so that they can support multiple engines within a single > platform. I guess cascading is heading in a similar direction (although no > spark or flink yet there, just mr1 and tez). > > On Sun, Nov 8, 2015 at 6:33 AM, Sean Owen <so...@cloudera.com> wrote: > >> Major releases can change APIs, yes. Although Flink is pretty similar >> in broad design and goals, the APIs are quite different in >> particulars. Speaking for myself, I can't imagine merging them, as it >> would either mean significantly changing Spark APIs, or making Flink >> use Spark APIs. It would mean effectively removing one project which >> seems infeasible. >> >> I am not sure of what you're saying the difference is, but I would not >> describe Spark as primarily for interactive use. >> >> Philosophically, I don't think One Big System to Rule Them All is a >> good goal. One project will never get it all right even within one >> niche. It's actually valuable to have many takes on important >> problems. Hence any problem worth solving gets solved 10 times. Just >> look at all those SQL engines and logging frameworks... >> >> On Sun, Nov 8, 2015 at 10:53 AM, Romi Kuntsman <r...@totango.com> wrote: >> > A major release usually means giving up on some API backward >> compatibility? >> > Can this be used as a chance to merge efforts with Apache Flink >> > (https://flink.apache.org/) and create the one ultimate open source >> big data >> > processing system? >> > Spark currently feels like it was made for interactive use (like Python >> and >> > R), and when used others (batch/streaming), it feels like scripted >> > interactive instead of really a standalone complete app. Maybe some base >> > concepts may be adapted? >> > >> > (I'm not currently a committer, but as a heavy Spark user I'd love to >> > participate in the discussion of what can/should be in Spark 2.0) >> > >> > Romi Kuntsman, Big Data Engineer >> > http://www.totango.com >> > >> > On Fri, Nov 6, 2015 at 2:53 PM, Jean-Baptiste Onofré <j...@nanthrax.net> >> > wrote: >> >> >> >> Hi Sean, >> >> >> >> Happy to see this discussion. >> >> >> >> I'm working on PoC to run Camel on Spark Streaming. The purpose is to >> have >> >> an ingestion and integration platform directly running on Spark >> Streaming. >> >> >> >> Basically, we would be able to use a Camel Spark DSL like: >> >> >> >> >> >> >> from("jms:queue:foo").choice().when(predicate).to("job:bar").when(predicate).to("hdfs:path").otherwise("file:path") >> >> >> >> Before a formal proposal (I have to do more work there), I'm just >> >> wondering if such framework can be a new Spark module (Spark >> Integration for >> >> instance, like Spark ML, Spark Stream, etc). >> >> >> >> Maybe it could be a good candidate for an addition in a "major" release >> >> like Spark 2.0. >> >> >> >> Just my $0.01 ;) >> >> >> >> Regards >> >> JB >> >> >> >> >> >> On 11/06/2015 01:44 PM, Sean Owen wrote: >> >>> >> >>> Since branch-1.6 is cut, I was going to make version 1.7.0 in JIRA. >> >>> However I've had a few s
Re: Ready to talk about Spark 2.0?
Since it seems we do have so much to talk about Spark 2.0, then the answer to the question "ready to talk about spark 2" is yes. But that doesn't mean the development of the 1.x branch is ready to stop or that there shouldn't be a 1.7 release. Regarding what should go into the next major version - obviously on the technical level it's breaking API changes and perhaps some long-awaited architectural refactoring. But what I think should be the major change is on the conceptual side - the realization that the way interactive, batch and streaming data flows work are fundamentally different, and building the framework around that will benefit each of those flows (like events instead of microbatches in streaming, worker-side intermediate processing in batch, etc). So where is the best way to have a full Spark 2.0 discussion? *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Sun, Nov 8, 2015 at 10:10 PM, Mark Hamstra <m...@clearstorydata.com> wrote: > Yes, that's clearer -- at least to me. > > But before going any further, let me note that we are already sliding past > Sean's opening question of "Should we start talking about Spark 2.0?" to > actually start talking about Spark 2.0. I'll try to keep the rest of this > post at a higher- or meta-level in order to attempt to avoid a somewhat > premature discussion of detailed 2.0 proposals, since I think that we do > still need to answer Sean's question and a couple of related questions > before really diving into the details of 2.0 planning. The related > questions that I am talking about are: Is Spark 1.x done except for > bug-fixing? and What would definitely make us say that we must begin > working on Spark 2.0? > > I'm not going to try to answer my own two questions even though I'm really > interested in how others will answer them, but I will answer Sean's by > saying that it is a good time to start talking about Spark 2.0 -- which is > quite different from saying that we are close to an understanding of what > will differentiate Spark 2.0 or when we want to deliver it. > > On the meta-2.0 discussion, I think that it is useful to break "Things > that will be different in 2.0" into some distinct categories. I see at > least three such categories for openers, although the third will probably > need to be broken down further. > > The first is the simplest, would take almost no time to complete, and > would have minimal impact on current Spark users. This is simply getting > rid of everything that is already marked deprecated in Spark 1.x but that > we haven't already gotten rid of because of our commitment to maintaining > API stability within major versions. There should be no need for > discussion or apology before getting rid of what is already deprecated -- > it's just gone and it's time to move on. Kind of a category-1.1 are parts > of the the current public API that are now marked as Experimental or > Developer that should become part of the fully-supported public API in 2.0 > -- and there is room for debate here. > > The next category of things that will be different in 2.0 isn't a lot > harder to implement, shouldn't take a lot of time to complete, but will > have some impact on current Spark users. I'm talking about areas in the > current code that we know don't work the way we want them to and don't have > the public API that we would like, but for which there aren't or can't be > recommended alternatives yet, so the code isn't formally marked as > deprecated. Again, these are things that we haven't already changed mostly > because of the need to maintain API stability in 1.x. But because these > haven't already been marked as deprecated, there is potential to catch > existing Spark users by surprise when the API changes. We don't guarantee > API stability across major version number changes, so there isn't any > reason why we can't make the changes we want, but we should start building > up a comprehensive list of API changes that will occur in Spark 2.0 to at > least minimize the amount of surprise for current Spark users. > > I don't already have anything like such a comprehensive list, but one > example of the kind of thing that I am talking about is something that I've > personally been looking at and regretting of late, and that's the > complicated relationships among SparkListener, SQLListener, onJobEnd and > onExecutionEnd. A lot of this complication is because of the need to > maintain the public API, so we end up with comments like this ( > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala#L58): > "Ideally, we need to make sure onExecutionEnd happens after onJobStart and > onJobEnd. However, onJobStart and onJobEnd run in the listen
Re: JMX with Spark
Have you read this? https://spark.apache.org/docs/latest/monitoring.html *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Thu, Nov 5, 2015 at 2:08 PM, Yogesh Vyas <informy...@gmail.com> wrote: > Hi, > How we can use JMX and JConsole to monitor our Spark applications? > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: DataFrame.toJavaRDD cause fetching data to driver, is it expected ?
I noticed that toJavaRDD causes a computation on the DataFrame, so is it considered an action, even though logically it's a transformation? On Nov 4, 2015 6:51 PM, "Aliaksei Tsyvunchyk"wrote: > Hello folks, > > Recently I have noticed unexpectedly big network traffic between Driver > Program and Worker node. > During debugging I have figured out that it is caused by following block > of code > > —— Java ——— — > DataFrame etpvRecords = context.sql(" SOME SQL query here"); > Mapper m = new Mapper(localValue, ProgramId::toProgId); > return etpvRecords > .toJavaRDD() > .map(m::mapHutPutViewingRow) > .reduce(Reducer::reduce); > —— Java > > I’m using debug breakpoint and OS X nettop to monitor traffic between > processes. So before approaching line toJavaRDD() I have 500Kb of traffic > and after executing this line I have 2.2 Mb of traffic. But when I check > size of result of reduce function it is 10 Kb. > So .toJavaRDD() seems causing worker process return dataset to driver > process and seems further map/reduce occurs on Driver. > > This is definitely not expected by me, so I have 2 questions. > 1. Is it really expected behavior that DataFrame.toJavaRDD cause whole > dataset return to driver or I’m doing something wrong? > 2. What is expected way to perform transformation with DataFrame using > custom Java map\reduce functions in case if standard SQL features are not > fit all my needs? > > Env: Spark 1.5.1 in standalone mode, (1 master, 1 worker sharing same > machine). Java 1.8.0_60. > > CONFIDENTIALITY NOTICE: This email and files attached to it are > confidential. If you are not the intended recipient you are hereby notified > that using, copying, distributing or taking any action in reliance on the > contents of this information is strictly prohibited. If you have received > this email in error please notify the sender and delete this email. >
Re: DataFrame.toJavaRDD cause fetching data to driver, is it expected ?
In my program I move between RDD and DataFrame several times. I know that the entire data of the DF doesn't go into the driver because it wouldn't fit there. But calling toJavaRDD does cause computation. Check the number of partitions you have on the DF and RDD... On Nov 4, 2015 7:54 PM, "Aliaksei Tsyvunchyk" <atsyvunc...@exadel.com> wrote: > Hello Romi, > > Do you mean that in my particular case I’m causing computation on > dataFrame or it is regular behavior of DataFrame.toJavaRDD ? > If it’s regular behavior, do you know which approach could be used to > perform make/reduce on dataFrame without causing it to load all data to > driver program ? > > On Nov 4, 2015, at 12:34 PM, Romi Kuntsman <r...@totango.com> wrote: > > I noticed that toJavaRDD causes a computation on the DataFrame, so is it > considered an action, even though logically it's a transformation? > On Nov 4, 2015 6:51 PM, "Aliaksei Tsyvunchyk" <atsyvunc...@exadel.com> > wrote: > >> Hello folks, >> >> Recently I have noticed unexpectedly big network traffic between Driver >> Program and Worker node. >> During debugging I have figured out that it is caused by following block >> of code >> >> —— Java ——— — >> DataFrame etpvRecords = context.sql(" SOME SQL query here"); >> Mapper m = new Mapper(localValue, ProgramId::toProgId); >> return etpvRecords >> .toJavaRDD() >> .map(m::mapHutPutViewingRow) >> .reduce(Reducer::reduce); >> —— Java >> >> I’m using debug breakpoint and OS X nettop to monitor traffic between >> processes. So before approaching line toJavaRDD() I have 500Kb of traffic >> and after executing this line I have 2.2 Mb of traffic. But when I check >> size of result of reduce function it is 10 Kb. >> So .toJavaRDD() seems causing worker process return dataset to driver >> process and seems further map/reduce occurs on Driver. >> >> This is definitely not expected by me, so I have 2 questions. >> 1. Is it really expected behavior that DataFrame.toJavaRDD cause whole >> dataset return to driver or I’m doing something wrong? >> 2. What is expected way to perform transformation with DataFrame using >> custom Java map\reduce functions in case if standard SQL features are not >> fit all my needs? >> >> Env: Spark 1.5.1 in standalone mode, (1 master, 1 worker sharing same >> machine). Java 1.8.0_60. >> >> CONFIDENTIALITY NOTICE: This email and files attached to it are >> confidential. If you are not the intended recipient you are hereby notified >> that using, copying, distributing or taking any action in reliance on the >> contents of this information is strictly prohibited. If you have received >> this email in error please notify the sender and delete this email. >> > > > CONFIDENTIALITY NOTICE: This email and files attached to it are > confidential. If you are not the intended recipient you are hereby notified > that using, copying, distributing or taking any action in reliance on the > contents of this information is strictly prohibited. If you have received > this email in error please notify the sender and delete this email. >
Re: Getting Started
https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Fri, Oct 30, 2015 at 1:25 PM, Saurabh Shah <shahsaurabh0...@gmail.com> wrote: > Hello, my name is Saurabh Shah and I am a second year undergraduate > student at DA-IICT, Gandhinagar, India. I have quite lately been > contributing towards the open source organizations and I find your > organization the most appropriate one to work on. > > I request you to please guide me through the installation of your codebase > and how to get started to your organization. > > > Thanking You, > > Saurabh Shah. >
Re: Error : - No filesystem for scheme: spark
except "spark.master", do you have "spark://" anywhere in your code or config files? *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Mon, Nov 2, 2015 at 11:27 AM, Balachandar R.A. <balachandar...@gmail.com> wrote: > > -- Forwarded message -- > From: "Balachandar R.A." <balachandar...@gmail.com> > Date: 02-Nov-2015 12:53 pm > Subject: Re: Error : - No filesystem for scheme: spark > To: "Jean-Baptiste Onofré" <j...@nanthrax.net> > Cc: > > > HI JB, > > Thanks for the response, > > Here is the content of my spark-defaults.conf > > > > > > # Default system properties included when running spark-submit. > > # This is useful for setting default environmental settings. > > > > # Example: > > spark.master spark://fdoat:7077 > > # spark.eventLog.enabled true > > spark.eventLog.dir/home/bala/spark-logs > > # spark.eventLog.dir hdfs://namenode:8021/directory > > # spark.serializer > org.apache.spark.serializer.KryoSerializer > > # spark.driver.memory 5g > > # spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value > -Dnumbers="one two three" > > > > > > regards > > Bala > > > > > On 2 November 2015 at 12:21, Jean-Baptiste Onofré <j...@nanthrax.net> > wrote: > >> > >> Hi, > >> > >> do you have something special in conf/spark-defaults.conf (especially > on the eventLog directory) ? > >> > >> Regards > >> JB > >> > >> > >> On 11/02/2015 07:48 AM, Balachandar R.A. wrote: > >>> > >>> Can someone tell me at what point this error could come? > >>> > >>> In one of my use cases, I am trying to use hadoop custom input format. > >>> Here is my code. > >>> > >>> |valhConf:Configuration=sc.hadoopConfiguration > >>> > hConf.set("fs.hdfs.impl",classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)hConf.set("fs.file.impl",classOf[org.apache.hadoop.fs.LocalFileSystem].getName)varjob > >>> > =newJob(hConf)FileInputFormat.setInputPaths(job,newPath("hdfs:///user/bala/MyBinaryFile"));varhRDD > >>> > =newNewHadoopRDD(sc,classOf[RandomAccessInputFormat],classOf[IntWritable],classOf[BytesWritable],job.getConfiguration())valcount > >>> > =hRDD.mapPartitionsWithInputSplit{(split,iter)=>myfuncPart(split,iter)}| > >>> > >>> |The moment I invoke mapPartitionsWithInputSplit() method, I get the > >>> below error in my spark-submit launch| > >>> > >>> | > >>> | > >>> > >>> |15/10/3011:11:39WARN scheduler.TaskSetManager:Losttask 0.0in stage > >>> 0.0(TID > 0,40.221.94.235):java.io.IOException:NoFileSystemforscheme:spark > >>> at > >>> > org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)at > >>> > org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)at > >>> org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)| > >>> > >>> Any help here to move towards fixing this will be of great help > >>> > >>> > >>> > >>> Thanks > >>> > >>> Bala > >>> > >> > >> -- > >> Jean-Baptiste Onofré > >> jbono...@apache.org > >> http://blog.nanthrax.net > >> Talend - http://www.talend.com > >> > >> - > >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > >> For additional commands, e-mail: user-h...@spark.apache.org > >> > > > >
Some spark apps fail with "All masters are unresponsive", while others pass normally
[adding dev list since it's probably a bug, but i'm not sure how to reproduce so I can open a bug about it] Hi, I have a standalone Spark 1.4.0 cluster with 100s of applications running every day. >From time to time, the applications crash with the following error (see below) But at the same time (and also after that), other applications are running, so I can safely assume the master and workers are working. 1. why is there a NullPointerException? (i can't track the scala stack trace to the code, but anyway NPE is usually a obvious bug even if there's actually a network error...) 2. why can't it connect to the master? (if it's a network timeout, how to increase it? i see the values are hardcoded inside AppClient) 3. how to recover from this error? ERROR 01-11 15:32:54,991SparkDeploySchedulerBackend - Application has been killed. Reason: All masters are unresponsive! Giving up. ERROR ERROR 01-11 15:32:55,087 OneForOneStrategy - ERROR logs/error.log java.lang.NullPointerException NullPointerException at org.apache.spark.deploy.client.AppClient$ClientActor$$anonfun$receiveWithLogging$1.applyOrElse(AppClient.scala:160) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.deploy.client.AppClient$ClientActor.aroundReceive(AppClient.scala:61) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ERROR 01-11 15:32:55,603 SparkContext - Error initializing SparkContext. ERROR java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext at org.apache.spark.SparkContext.org $apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:103) at org.apache.spark.SparkContext.getSchedulingMode(SparkContext.scala:1501) at org.apache.spark.SparkContext.postEnvironmentUpdate(SparkContext.scala:2005) at org.apache.spark.SparkContext.(SparkContext.scala:543) at org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61) Thanks! *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com
Some spark apps fail with "All masters are unresponsive", while others pass normally
[adding dev list since it's probably a bug, but i'm not sure how to reproduce so I can open a bug about it] Hi, I have a standalone Spark 1.4.0 cluster with 100s of applications running every day. >From time to time, the applications crash with the following error (see below) But at the same time (and also after that), other applications are running, so I can safely assume the master and workers are working. 1. why is there a NullPointerException? (i can't track the scala stack trace to the code, but anyway NPE is usually a obvious bug even if there's actually a network error...) 2. why can't it connect to the master? (if it's a network timeout, how to increase it? i see the values are hardcoded inside AppClient) 3. how to recover from this error? ERROR 01-11 15:32:54,991SparkDeploySchedulerBackend - Application has been killed. Reason: All masters are unresponsive! Giving up. ERROR ERROR 01-11 15:32:55,087 OneForOneStrategy - ERROR logs/error.log java.lang.NullPointerException NullPointerException at org.apache.spark.deploy.client.AppClient$ClientActor$$anonfun$receiveWithLogging$1.applyOrElse(AppClient.scala:160) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.deploy.client.AppClient$ClientActor.aroundReceive(AppClient.scala:61) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ERROR 01-11 15:32:55,603 SparkContext - Error initializing SparkContext. ERROR java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext at org.apache.spark.SparkContext.org $apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:103) at org.apache.spark.SparkContext.getSchedulingMode(SparkContext.scala:1501) at org.apache.spark.SparkContext.postEnvironmentUpdate(SparkContext.scala:2005) at org.apache.spark.SparkContext.(SparkContext.scala:543) at org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61) Thanks! *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com
Re: NullPointerException when cache DataFrame in Java (Spark1.5.1)
Did you try to cache a DataFrame with just a single row? Do you rows have any columns with null values? Can you post a code snippet here on how you load/generate the dataframe? Does dataframe.rdd.cache work? *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Thu, Oct 29, 2015 at 4:33 AM, Zhang, Jingyu <jingyu.zh...@news.com.au> wrote: > It is not a problem to use JavaRDD.cache() for 200M data (all Objects read > form Json Format). But when I try to use DataFrame.cache(), It shown > exception in below. > > My machine can cache 1 G data in Avro format without any problem. > > 15/10/29 13:26:23 INFO GeneratePredicate: Code generated in 154.531827 ms > > 15/10/29 13:26:23 INFO GenerateUnsafeProjection: Code generated in > 27.832369 ms > > 15/10/29 13:26:23 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID > 1) > > java.lang.NullPointerException > > at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) > > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:497) > > at > org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply( > SQLContext.scala:500) > > at > org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply( > SQLContext.scala:500) > > at scala.collection.TraversableLike$$anonfun$map$1.apply( > TraversableLike.scala:244) > > at scala.collection.TraversableLike$$anonfun$map$1.apply( > TraversableLike.scala:244) > > at scala.collection.IndexedSeqOptimized$class.foreach( > IndexedSeqOptimized.scala:33) > > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > > at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) > > at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply( > SQLContext.scala:500) > > at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply( > SQLContext.scala:498) > > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > > at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) > > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > > at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next( > InMemoryColumnarTableScan.scala:127) > > at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next( > InMemoryColumnarTableScan.scala:120) > > at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278 > ) > > at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) > > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) > > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38 > ) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38 > ) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > > at org.apache.spark.scheduler.Task.run(Task.scala:88) > > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > > at java.lang.Thread.run(Thread.java:745) > > 15/10/29 13:26:23 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, > localhost): java.lang.NullPointerException > > at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) > > > Thanks, > > > Jingyu > > This message and its attachments may contain legally privileged or > confidential information. It is intended solely for the named addressee. If > you are not the addressee indicated in this message or responsible for > delivery of the message to the addressee, you may not copy or deliver this > message or its attachments to anyone. Rather, you should permanently delete > this message and its attachments and kindly notify the sender by reply > e-mail. Any content of this message and its attachments which does not > relate to the official business of the sending company must be taken not to > have been sent or endorsed by that company or any of its related entities. > No warranty is made that the e-mail or attachments are free from computer > virus or other defect.
Re: NullPointerException when cache DataFrame in Java (Spark1.5.1)
> > BUT, after change limit(500) to limit(1000). The code report > NullPointerException. > I had a similar situation, and the problem was with a certain record. Try to find which records are returned when you limit to 1000 but not returned when you limit to 500. Could it be a NPE thrown from PixelObject? Are you running spark with master=local, so it's running inside your IDE and you can see the errors from the driver and worker? *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Thu, Oct 29, 2015 at 10:04 AM, Zhang, Jingyu <jingyu.zh...@news.com.au> wrote: > Thanks Romi, > > I resize the dataset to 7MB, however, the code show NullPointerException > exception as well. > > Did you try to cache a DataFrame with just a single row? > > Yes, I tried. But, Same problem. > . > Do you rows have any columns with null values? > > No, I had filter out null values before cache the dataframe. > > Can you post a code snippet here on how you load/generate the dataframe? > > Sure, Here is the working code 1: > > JavaRDD pixels = pixelsStr.map(new PixelGenerator()).cache(); > > System.out.println(pixels.count()); // 3000-4000 rows > > Working code 2: > > JavaRDD pixels = pixelsStr.map(new PixelGenerator()); > > DataFrame schemaPixel = sqlContext.createDataFrame(pixels, PixelObject. > class); > > DataFrame totalDF1 = > schemaPixel.select(schemaPixel.col("domain")).filter("'domain' > is not null").limit(500); > > System.out.println(totalDF1.count()); > > > BUT, after change limit(500) to limit(1000). The code report > NullPointerException. > > > JavaRDD pixels = pixelsStr.map(new PixelGenerator()); > > DataFrame schemaPixel = sqlContext.createDataFrame(pixels, PixelObject. > class); > > DataFrame totalDF = > schemaPixel.select(schemaPixel.col("domain")).filter("'domain' > is not null").limit(*1000*); > > System.out.println(totalDF.count()); // problem at this line > > 15/10/29 18:56:28 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks > have all completed, from pool > > 15/10/29 18:56:28 INFO TaskSchedulerImpl: Cancelling stage 0 > > 15/10/29 18:56:28 INFO DAGScheduler: ShuffleMapStage 0 (count at > X.java:113) failed in 3.764 s > > 15/10/29 18:56:28 INFO DAGScheduler: Job 0 failed: count at XXX.java:113, > took 3.862207 s > > org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage > 0.0 (TID 0, localhost): java.lang.NullPointerException > > at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) > Does dataframe.rdd.cache work? > > No, I tried but same exception. > > Thanks, > > Jingyu > > On 29 October 2015 at 17:38, Romi Kuntsman <r...@totango.com> wrote: > >> Did you try to cache a DataFrame with just a single row? >> Do you rows have any columns with null values? >> Can you post a code snippet here on how you load/generate the dataframe? >> Does dataframe.rdd.cache work? >> >> *Romi Kuntsman*, *Big Data Engineer* >> http://www.totango.com >> >> On Thu, Oct 29, 2015 at 4:33 AM, Zhang, Jingyu <jingyu.zh...@news.com.au> >> wrote: >> >>> It is not a problem to use JavaRDD.cache() for 200M data (all Objects >>> read form Json Format). But when I try to use DataFrame.cache(), It shown >>> exception in below. >>> >>> My machine can cache 1 G data in Avro format without any problem. >>> >>> 15/10/29 13:26:23 INFO GeneratePredicate: Code generated in 154.531827 ms >>> >>> 15/10/29 13:26:23 INFO GenerateUnsafeProjection: Code generated in >>> 27.832369 ms >>> >>> 15/10/29 13:26:23 ERROR Executor: Exception in task 0.0 in stage 1.0 >>> (TID 1) >>> >>> java.lang.NullPointerException >>> >>> at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) >>> >>> at sun.reflect.DelegatingMethodAccessorImpl.invoke( >>> DelegatingMethodAccessorImpl.java:43) >>> >>> at java.lang.reflect.Method.invoke(Method.java:497) >>> >>> at >>> org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply( >>> SQLContext.scala:500) >>> >>> at >>> org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply( >>> SQLContext.scala:500) >>> >>> at scala.collection.TraversableLike$$anonfun$map$1.apply( >>> TraversableLike.scala:244) >>> >>> at scala.collection.TraversableLike$$anonfun$map$1.apply( >>> Tra
[jira] [Commented] (SPARK-11229) NPE in JoinedRow.isNullAt when spark.shuffle.memoryFraction=0
[ https://issues.apache.org/jira/browse/SPARK-11229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14968662#comment-14968662 ] Romi Kuntsman commented on SPARK-11229: --- [~marmbrus] it's reproducible in 1.5.1 as [~xwu0226] confirmed, shouldn't it be marked as "fixed in 1.6.0" instead of "cannot reproduce"? > NPE in JoinedRow.isNullAt when spark.shuffle.memoryFraction=0 > - > > Key: SPARK-11229 > URL: https://issues.apache.org/jira/browse/SPARK-11229 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.1 > Environment: 14.04.1-Ubuntu SMP x86_64 GNU/Linux >Reporter: Romi Kuntsman > > Steps to reproduce: > 1. set spark.shuffle.memoryFraction=0 > 2. load dataframe from parquet file > 3. see it's read correctly by calling dataframe.show() > 4. call dataframe.count() > Expected behaviour: > get count of rows in dataframe > OR, if memoryFraction=0 is an invalid setting, get notified about it > Actual behaviour: > CatalystReadSupport doesn't read the schema (even thought there is one) and > then there's a NullPointerException. > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1835) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1848) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1919) > at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:905) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) > at org.apache.spark.rdd.RDD.collect(RDD.scala:904) > at > org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:177) > at > org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385) > at > org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) > at > org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903) > at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384) > at org.apache.spark.sql.DataFrame.count(DataFrame.scala:1402) > ... 14 more > Caused by: java.lang.NullPointerException > at > org.apache.spark.sql.catalyst.expressions.JoinedRow.isNullAt(JoinedRow.scala:70) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown > Source) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator$$anonfun$generateProcessRow$1.apply(TungstenAggregationIterator.scala:194) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator$$anonfun$generateProcessRow$1.apply(TungstenAggregationIterator.scala:192) > at > org.apache.spark.sql.execution.aggregate.TungstenA
[jira] [Commented] (SPARK-7335) Submitting a query to Thrift Server occurs error: java.lang.IllegalStateException: unread block data
[ https://issues.apache.org/jira/browse/SPARK-7335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14966490#comment-14966490 ] Romi Kuntsman commented on SPARK-7335: -- [~meiyoula] can you please reopen the issue? I got this error as well, and wasted a lot of time realizing it was actually a ClassNotFoundException due to a missing jar - it doesn't say that anywhere in the exception. When the class to be serialized/deserialized is not found, then the exception should explicitly say so. When the message says "unread block data" it's very confusing. Here's a reference from another project who got the same error and many people spent time to realize it was a missing jar: https://github.com/tuplejump/calliope-release/issues/6 > Submitting a query to Thrift Server occurs error: > java.lang.IllegalStateException: unread block data > > > Key: SPARK-7335 > URL: https://issues.apache.org/jira/browse/SPARK-7335 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: meiyoula >Priority: Critical > > java.lang.IllegalStateException: unread block data > at > java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2421) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) > at > org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:163) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-11229) NPE in JoinedRow.isNullAt when spark.shuffle.memoryFraction=0
Romi Kuntsman created SPARK-11229: - Summary: NPE in JoinedRow.isNullAt when spark.shuffle.memoryFraction=0 Key: SPARK-11229 URL: https://issues.apache.org/jira/browse/SPARK-11229 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.5.1 Environment: 14.04.1-Ubuntu SMP x86_64 GNU/Linux Reporter: Romi Kuntsman Steps to reproduce: 1. set spark.shuffle.memoryFraction=0 2. load dataframe from parquet file 3. see it's read correctly by calling dataframe.show() 4. call dataframe.count() Expected behaviour: get count of rows in dataframe OR, if memoryFraction=0 is an invalid setting, get notified about it Actual behaviour: CatalystReadSupport doesn't read the schema (even thought there is one) and then there's a NullPointerException. Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1835) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1848) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1919) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:905) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) at org.apache.spark.rdd.RDD.collect(RDD.scala:904) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:177) at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385) at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903) at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384) at org.apache.spark.sql.DataFrame.count(DataFrame.scala:1402) ... 14 more Caused by: java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.JoinedRow.isNullAt(JoinedRow.scala:70) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown Source) at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator$$anonfun$generateProcessRow$1.apply(TungstenAggregationIterator.scala:194) at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator$$anonfun$generateProcessRow$1.apply(TungstenAggregationIterator.scala:192) at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:368) at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622) at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110) at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119
[jira] [Commented] (SPARK-11153) Turns off Parquet filter push-down for string and binary columns
[ https://issues.apache.org/jira/browse/SPARK-11153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14966306#comment-14966306 ] Romi Kuntsman commented on SPARK-11153: --- Does this mean that all Spark 1.5.1 are recommended to set spark.sql.parquet.filterPushdown to false? > Turns off Parquet filter push-down for string and binary columns > > > Key: SPARK-11153 > URL: https://issues.apache.org/jira/browse/SPARK-11153 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.0, 1.5.1 >Reporter: Cheng Lian >Assignee: Cheng Lian >Priority: Blocker > Fix For: 1.5.2, 1.6.0 > > > Due to PARQUET-251, {{BINARY}} columns in existing Parquet files may be > written with corrupted statistics information. This information is used by > filter push-down optimization. Since Spark 1.5 turns on Parquet filter > push-down by default, we may end up with wrong query results. PARQUET-251 has > been fixed in parquet-mr 1.8.1, but Spark 1.5 is still using 1.7.0. > Note that this kind of corrupted Parquet files could be produced by any > Parquet data models. > This affects all Spark SQL data types that can be mapped to Parquet > {{BINARY}}, namely: > - {{StringType}} > - {{BinaryType}} > - {{DecimalType}} (but Spark SQL doesn't support pushing down {{DecimalType}} > columns for now.) > To avoid wrong query results, we should disable filter push-down for columns > of {{StringType}} and {{BinaryType}} until we upgrade to parquet-mr 1.8. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-11228) Job stuck in Executor failure loop when NettyTransport failed to bind
Romi Kuntsman created SPARK-11228: - Summary: Job stuck in Executor failure loop when NettyTransport failed to bind Key: SPARK-11228 URL: https://issues.apache.org/jira/browse/SPARK-11228 Project: Spark Issue Type: Bug Components: Scheduler Affects Versions: 1.5.1 Environment: 14.04.1-Ubuntu SMP x86_64 GNU/Linux Reporter: Romi Kuntsman I changed my network connection while a local spark cluster is running. In port 8080, I see the master and worker running. I'm running Spark in Java in client mode, so the driver is running inside my IDE. When trying to start a job on the local spark cluster, I get an endless loop of the errors below at #1. It only stops when I kill the application manually. When looking at the worker log, I see an endless loop of the errors below at #2. Expected behaviour would be failing the job after a few failed retries / timeout. (IP anonymized to 1.2.3.4) 1. Errors see on driver: 2015-10-21 11:20:54,793 INFO [org.apache.spark.scheduler.TaskSchedulerImpl] Adding task set 0.0 with 2 tasks 2015-10-21 11:20:55,847 INFO [org.apache.spark.deploy.client.AppClient$ClientEndpoint] Executor updated: app-20151021112052-0005/1 is now EXITED (Command exited with code 1) 2015-10-21 11:20:55,847 INFO [org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend] Executor app-20151021112052-0005/1 removed: Command exited with code 1 2015-10-21 11:20:55,848 INFO [org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend] Asked to remove non-existent executor 1 2015-10-21 11:20:55,848 INFO [org.apache.spark.deploy.client.AppClient$ClientEndpoint] Executor added: app-20151021112052-0005/2 on worker-20151021090623-1.2.3.4-57305 (1.2.3.4:57305) with 1 cores 2015-10-21 11:20:55,848 INFO [org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend] Granted executor ID app-20151021112052-0005/2 on hostPort 1.2.3.4:57305 with 1 cores, 4.9 GB RAM 2015-10-21 11:20:55,849 INFO [org.apache.spark.deploy.client.AppClient$ClientEndpoint] Executor updated: app-20151021112052-0005/2 is now LOADING 2015-10-21 11:20:55,852 INFO [org.apache.spark.deploy.client.AppClient$ClientEndpoint] Executor updated: app-20151021112052-0005/2 is now RUNNING 2015-10-21 11:20:57,165 INFO [org.apache.spark.deploy.client.AppClient$ClientEndpoint] Executor updated: app-20151021112052-0005/2 is now EXITED (Command exited with code 1) 2015-10-21 11:20:57,165 INFO [org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend] Executor app-20151021112052-0005/2 removed: Command exited with code 1 2015-10-21 11:20:57,166 INFO [org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend] Asked to remove non-existent executor 2 2015-10-21 11:20:57,166 INFO [org.apache.spark.deploy.client.AppClient$ClientEndpoint] Executor added: app-20151021112052-0005/3 on worker-20151021090623-1.2.3.4-57305 (1.2.3.4:57305) with 1 cores 2015-10-21 11:20:57,167 INFO [org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend] Granted executor ID app-20151021112052-0005/3 on hostPort 1.2.3.4:57305 with 1 cores, 4.9 GB RAM 2015-10-21 11:20:57,167 INFO [org.apache.spark.deploy.client.AppClient$ClientEndpoint] Executor updated: app-20151021112052-0005/3 is now LOADING 2015-10-21 11:20:57,169 INFO [org.apache.spark.deploy.client.AppClient$ClientEndpoint] Executor updated: app-20151021112052-0005/3 is now RUNNING 2015-10-21 11:20:58,531 INFO [org.apache.spark.deploy.client.AppClient$ClientEndpoint] Executor updated: app-20151021112052-0005/3 is now EXITED (Command exited with code 1) 2015-10-21 11:20:58,531 INFO [org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend] Executor app-20151021112052-0005/3 removed: Command exited with code 1 2015-10-21 11:20:58,532 INFO [org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend] Asked to remove non-existent executor 3 2015-10-21 11:20:58,532 INFO [org.apache.spark.deploy.client.AppClient$ClientEndpoint] Executor added: app-20151021112052-0005/4 on worker-20151021090623-1.2.3.4-57305 (1.2.3.4:57305) with 1 cores 2015-10-21 11:20:58,532 INFO [org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend] Granted executor ID app-20151021112052-0005/4 on hostPort 1.2.3.4:57305 with 1 cores, 4.9 GB RAM 2015-10-21 11:20:58,533 INFO [org.apache.spark.deploy.client.AppClient$ClientEndpoint] Executor updated: app-20151021112052-0005/4 is now LOADING 2015-10-21 11:20:58,535 INFO [org.apache.spark.deploy.client.AppClient$ClientEndpoint] Executor updated: app-20151021112052-0005/4 is now RUNNING 2015-10-21 11:20:59,932 INFO [org.apache.spark.deploy.client.AppClient$ClientEndpoint] Executor updated: app-20151021112052-0005/4 is now EXITED (Command exited with code 1) 2015-10-21 11:20:59,933 INFO [org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend] Executor app
[jira] [Commented] (SPARK-2563) Re-open sockets to handle connect timeouts
[ https://issues.apache.org/jira/browse/SPARK-2563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14955264#comment-14955264 ] Romi Kuntsman commented on SPARK-2563: -- i got a socket timeout in spark 1.4.0 is this still relevant for the last version, or is this bug abandoned? > Re-open sockets to handle connect timeouts > -- > > Key: SPARK-2563 > URL: https://issues.apache.org/jira/browse/SPARK-2563 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Reporter: Shivaram Venkataraman >Priority: Minor > > In a large EC2 cluster, I often see the first shuffle stage in a job fail due > to connection timeout exceptions. > If the connection attempt times out, the socket gets closed and from [1] we > get a ClosedChannelException. We should check if the Socket was closed due > to a timeout and open a new socket and try to connect. > FWIW, I was able to work around my problems by increasing the number of SYN > retries in Linux. (I ran echo 8 > /proc/sys/net/ipv4/tcp_syn_retries) > [1] > http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/6-b14/sun/nio/ch/SocketChannelImpl.java?av=h#573 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
Re: How to get a new RDD by ordinarily subtract its adjacent rows
RDD is a set of data rows (in your case numbers), there is no meaning for the order of the items. What exactly are you trying to accomplish? *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Mon, Sep 21, 2015 at 2:29 PM, Zhiliang Zhu <zchl.j...@yahoo.com.invalid> wrote: > Dear , > > I have took lots of days to think into this issue, however, without any > success... > I shall appreciate your all kind help. > > There is an RDD rdd1, I would like get a new RDD rdd2, each row > in rdd2[ i ] = rdd1[ i ] - rdd[i - 1] . > What kinds of API or function would I use... > > > Thanks very much! > John > >
Re: passing SparkContext as parameter
sparkConext is available on the driver, not on executors. To read from Cassandra, you can use something like this: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Mon, Sep 21, 2015 at 2:27 PM, Priya Ch <learnings.chitt...@gmail.com> wrote: > can i use this sparkContext on executors ?? > In my application, i have scenario of reading from db for certain records > in rdd. Hence I need sparkContext to read from DB (cassandra in our case), > > If sparkContext couldn't be sent to executors , what is the workaround for > this ?? > > On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak <oss.mli...@gmail.com> wrote: > >> add @transient? >> >> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch <learnings.chitt...@gmail.com> >> wrote: >> >>> Hello All, >>> >>> How can i pass sparkContext as a parameter to a method in an object. >>> Because passing sparkContext is giving me TaskNotSerializable Exception. >>> >>> How can i achieve this ? >>> >>> Thanks, >>> Padma Ch >>> >> >> >
Re: passing SparkContext as parameter
sparkConext is available on the driver, not on executors. To read from Cassandra, you can use something like this: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Mon, Sep 21, 2015 at 2:27 PM, Priya Ch <learnings.chitt...@gmail.com> wrote: > can i use this sparkContext on executors ?? > In my application, i have scenario of reading from db for certain records > in rdd. Hence I need sparkContext to read from DB (cassandra in our case), > > If sparkContext couldn't be sent to executors , what is the workaround for > this ?? > > On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak <oss.mli...@gmail.com> wrote: > >> add @transient? >> >> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch <learnings.chitt...@gmail.com> >> wrote: >> >>> Hello All, >>> >>> How can i pass sparkContext as a parameter to a method in an object. >>> Because passing sparkContext is giving me TaskNotSerializable Exception. >>> >>> How can i achieve this ? >>> >>> Thanks, >>> Padma Ch >>> >> >> >
Re: passing SparkContext as parameter
foreach is something that runs on the driver, not the workers. if you want to perform some function on each record from cassandra, you need to do cassandraRdd.map(func), which will run distributed on the spark workers *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Mon, Sep 21, 2015 at 3:29 PM, Priya Ch <learnings.chitt...@gmail.com> wrote: > Yes, but i need to read from cassandra db within a spark > transformation..something like.. > > dstream.forachRDD{ > > rdd=> rdd.foreach { > message => > sc.cassandraTable() > . > . > . > } > } > > Since rdd.foreach gets executed on workers, how can i make sparkContext > available on workers ??? > > Regards, > Padma Ch > > On Mon, Sep 21, 2015 at 5:10 PM, Ted Yu <yuzhih...@gmail.com> wrote: > >> You can use broadcast variable for passing connection information. >> >> Cheers >> >> On Sep 21, 2015, at 4:27 AM, Priya Ch <learnings.chitt...@gmail.com> >> wrote: >> >> can i use this sparkContext on executors ?? >> In my application, i have scenario of reading from db for certain records >> in rdd. Hence I need sparkContext to read from DB (cassandra in our case), >> >> If sparkContext couldn't be sent to executors , what is the workaround >> for this ?? >> >> On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak <oss.mli...@gmail.com> wrote: >> >>> add @transient? >>> >>> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch <learnings.chitt...@gmail.com >>> > wrote: >>> >>>> Hello All, >>>> >>>> How can i pass sparkContext as a parameter to a method in an >>>> object. Because passing sparkContext is giving me TaskNotSerializable >>>> Exception. >>>> >>>> How can i achieve this ? >>>> >>>> Thanks, >>>> Padma Ch >>>> >>> >>> >> >
Re: how to send additional configuration to the RDD after it was lazily created
What new information do you know after creating the RDD, that you didn't know at the time of it's creation? I think the whole point is that RDD is immutable, you can't change it once it was created. Perhaps you need to refactor your logic to know the parameters earlier, or create a whole new RDD again. *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Thu, Sep 17, 2015 at 10:07 AM, Gil Vernik <g...@il.ibm.com> wrote: > Hi, > > I have the following case, which i am not sure how to resolve. > > My code uses HadoopRDD and creates various RDDs on top of it > (MapPartitionsRDD, and so on ) > After all RDDs were lazily created, my code "knows" some new information > and i want that "compute" method of the HadoopRDD will be aware of it (at > the point when "compute" method will be called). > What is the possible way 'to send' some additional information to the > compute method of the HadoopRDD after this RDD is lazily created? > I tried to play with configuration, like to perform set("test","111") in > the code and modify the compute method of HadoopRDD with get("test") - but > of it's not working, since SparkContext has only clone of the of the > configuration and it can't be modified in run time. > > Any thoughts how can i make it? > > Thanks > Gil.
Re: how to get RDD from two different RDDs with cross column
Hi, If I understand correctly: rdd1 contains keys (of type StringDate) rdd2 contains keys and values and rdd3 contains all the keys, and the values from rdd2? I think you should make rdd1 and rdd2 PairRDD, and then use outer join. Does that make sense? On Mon, Sep 21, 2015 at 8:37 PM Zhiliang Zhuwrote: > Dear Romi, Priya, Sujt and Shivaram and all, > > I have took lots of days to think into this issue, however, without any > enough good solution... > I shall appreciate your all kind help. > > There is an RDD rdd1, and another RDD rdd2, > (rdd2 can be PairRDD, or DataFrame with two columns as ). > StringDate column values from rdd1 and rdd2 are cross but not the same. > > I would like to get a new RDD rdd3, StringDate in rdd3 > would be all from (same) as rdd1, and float in rdd3 would be from rdd2 if > its > StringDate is in rdd2, or else NULL would be assigned. > each row in rdd3[ i ] = , > rdd2[i].StringDate would be same as rdd1[ i ].StringDate, > then rdd2[ i ].float is assigned rdd3[ i ] StringDate part. > What kinds of API or function would I use... > > Thanks very much! > Zhiliang > > >
Re: passing SparkContext as parameter
Cody, that's a great reference! As shown there - the best way to connect to an external database from the workers is to create a connection pool on (each) worker. The driver mass pass, via broadcast, the connection string, but not the connect object itself and not the spark context. On Mon, Sep 21, 2015 at 5:31 PM Cody Koeninger <c...@koeninger.org> wrote: > That isn't accurate, I think you're confused about foreach. > > Look at > > > http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd > > > On Mon, Sep 21, 2015 at 7:36 AM, Romi Kuntsman <r...@totango.com> wrote: > >> foreach is something that runs on the driver, not the workers. >> >> if you want to perform some function on each record from cassandra, you >> need to do cassandraRdd.map(func), which will run distributed on the spark >> workers >> >> *Romi Kuntsman*, *Big Data Engineer* >> http://www.totango.com >> >> On Mon, Sep 21, 2015 at 3:29 PM, Priya Ch <learnings.chitt...@gmail.com> >> wrote: >> >>> Yes, but i need to read from cassandra db within a spark >>> transformation..something like.. >>> >>> dstream.forachRDD{ >>> >>> rdd=> rdd.foreach { >>> message => >>> sc.cassandraTable() >>> . >>> . >>> . >>> } >>> } >>> >>> Since rdd.foreach gets executed on workers, how can i make sparkContext >>> available on workers ??? >>> >>> Regards, >>> Padma Ch >>> >>> On Mon, Sep 21, 2015 at 5:10 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>> >>>> You can use broadcast variable for passing connection information. >>>> >>>> Cheers >>>> >>>> On Sep 21, 2015, at 4:27 AM, Priya Ch <learnings.chitt...@gmail.com> >>>> wrote: >>>> >>>> can i use this sparkContext on executors ?? >>>> In my application, i have scenario of reading from db for certain >>>> records in rdd. Hence I need sparkContext to read from DB (cassandra in our >>>> case), >>>> >>>> If sparkContext couldn't be sent to executors , what is the workaround >>>> for this ?? >>>> >>>> On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak <oss.mli...@gmail.com> >>>> wrote: >>>> >>>>> add @transient? >>>>> >>>>> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch < >>>>> learnings.chitt...@gmail.com> wrote: >>>>> >>>>>> Hello All, >>>>>> >>>>>> How can i pass sparkContext as a parameter to a method in an >>>>>> object. Because passing sparkContext is giving me TaskNotSerializable >>>>>> Exception. >>>>>> >>>>>> How can i achieve this ? >>>>>> >>>>>> Thanks, >>>>>> Padma Ch >>>>>> >>>>> >>>>> >>>> >>> >> >
[jira] [Commented] (SPARK-5421) SparkSql throw OOM at shuffle
[ https://issues.apache.org/jira/browse/SPARK-5421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14734561#comment-14734561 ] Romi Kuntsman commented on SPARK-5421: -- does this still happen on the latest version? I got some OOM with Spark 1.4.0 > SparkSql throw OOM at shuffle > - > > Key: SPARK-5421 > URL: https://issues.apache.org/jira/browse/SPARK-5421 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.2.0 >Reporter: Hong Shen > > ExternalAppendOnlyMap if only for the spark job that aggregator isDefined, > but sparkSQL's shuffledRDD haven't define aggregator, so sparkSQL won't spill > at shuffle, it's very easy to throw OOM at shuffle. I think sparkSQL also > need spill at shuffle. > One of the executor's log, here is stderr: > 15/01/27 07:02:19 INFO spark.MapOutputTrackerWorker: Don't have map outputs > for shuffle 1, fetching them > 15/01/27 07:02:19 INFO spark.MapOutputTrackerWorker: Doing the fetch; tracker > actor = > Actor[akka.tcp://sparkDriver@10.196.128.140:40952/user/MapOutputTracker#1435377484] > 15/01/27 07:02:19 INFO spark.MapOutputTrackerWorker: Got the output locations > 15/01/27 07:02:19 INFO storage.ShuffleBlockFetcherIterator: Getting 143 > non-empty blocks out of 143 blocks > 15/01/27 07:02:19 INFO storage.ShuffleBlockFetcherIterator: Started 4 remote > fetches in 72 ms > 15/01/27 07:47:29 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED > SIGNAL 15: SIGTERM > here is stdout: > 2015-01-27T07:44:43.487+0800: [Full GC 3961343K->3959868K(3961344K), > 29.8959290 secs] > 2015-01-27T07:45:13.460+0800: [Full GC 3961343K->3959992K(3961344K), > 27.9218150 secs] > 2015-01-27T07:45:41.407+0800: [GC 3960347K(3961344K), 3.0457450 secs] > 2015-01-27T07:45:52.950+0800: [Full GC 3961343K->3960113K(3961344K), > 29.3894670 secs] > 2015-01-27T07:46:22.393+0800: [Full GC 3961118K->3960240K(3961344K), > 28.9879600 secs] > 2015-01-27T07:46:51.393+0800: [Full GC 3960240K->3960213K(3961344K), > 34.1530900 secs] > # > # java.lang.OutOfMemoryError: Java heap space > # -XX:OnOutOfMemoryError="kill %p" > # Executing /bin/sh -c "kill 9050"... > 2015-01-27T07:47:25.921+0800: [GC 3960214K(3961344K), 3.3959300 secs] -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
How to determine the value for spark.sql.shuffle.partitions?
Hi all, The number of partition greatly affect the speed and efficiency of calculation, in my case in DataFrames/SparkSQL on Spark 1.4.0. Too few partitions with large data cause OOM exceptions. Too many partitions on small data cause a delay due to overhead. How do you programmatically determine the optimal number of partitions and cores in Spark, as a function of: 1. available memory per core 2. number of records in input data 3. average/maximum record size 4. cache configuration 5. shuffle configuration 6. serialization 7. etc? Any general best practices? Thanks! Romi K.
Re: How to remove worker node but let it finish first?
It's only available in Mesos? I'm using spark standalone cluster, is there anything about it there? On Fri, Aug 28, 2015 at 8:51 AM Akhil Das ak...@sigmoidanalytics.com wrote: You can create a custom mesos framework for your requirement, to get you started you can check this out http://mesos.apache.org/documentation/latest/app-framework-development-guide/ Thanks Best Regards On Mon, Aug 24, 2015 at 12:11 PM, Romi Kuntsman r...@totango.com wrote: Hi, I have a spark standalone cluster with 100s of applications per day, and it changes size (more or less workers) at various hours. The driver runs on a separate machine outside the spark cluster. When a job is running and it's worker is killed (because at that hour the number of workers is reduced), it sometimes fails, instead of redistributing the work to other workers. How is it possible to decomission a worker, so that it doesn't receive any new work, but does finish all existing work before shutting down? Thanks!
Re: Exception when S3 path contains colons
Hello, We had the same problem. I've written a blog post with the detailed explanation and workaround: http://labs.totango.com/spark-read-file-with-colon/ Greetings, Romi K. On Tue, Aug 25, 2015 at 2:47 PM Gourav Sengupta gourav.sengu...@gmail.com wrote: I am not quite sure about this but should the notation not be s3n://redactedbucketname/* instead of s3a://redactedbucketname/* The best way is to use s3://bucketname/path/* Regards, Gourav On Tue, Aug 25, 2015 at 10:35 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You can change the names, whatever program that is pushing the record must follow the naming conventions. Try to replace : with _ or something. Thanks Best Regards On Tue, Aug 18, 2015 at 10:20 AM, Brian Stempin brian.stem...@gmail.com wrote: Hi, I'm running Spark on Amazon EMR (Spark 1.4.1, Hadoop 2.6.0). I'm seeing the exception below when encountering file names that contain colons. Any idea on how to get around this? scala val files = sc.textFile(s3a://redactedbucketname/*) 2015-08-18 04:38:34,567 INFO [main] storage.MemoryStore (Logging.scala:logInfo(59)) - ensureFreeSpace(242224) called with curMem=669367, maxMem=285203496 2015-08-18 04:38:34,568 INFO [main] storage.MemoryStore (Logging.scala:logInfo(59)) - Block broadcast_3 stored as values in memory (estimated size 236.5 KB, free 271.1 MB) 2015-08-18 04:38:34,663 INFO [main] storage.MemoryStore (Logging.scala:logInfo(59)) - ensureFreeSpace(21533) called with curMem=911591, maxMem=285203496 2015-08-18 04:38:34,664 INFO [main] storage.MemoryStore (Logging.scala:logInfo(59)) - Block broadcast_3_piece0 stored as bytes in memory (estimated size 21.0 KB, free 271.1 MB) 2015-08-18 04:38:34,665 INFO [sparkDriver-akka.actor.default-dispatcher-19] storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Added broadcast_3_piece0 in memory on 10.182.184.26:60338 (size: 21.0 KB, free: 271.9 MB) 2015-08-18 04:38:34,667 INFO [main] spark.SparkContext (Logging.scala:logInfo(59)) - Created broadcast 3 from textFile at console:21 files: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at textFile at console:21 scala files.count 2015-08-18 04:38:37,262 INFO [main] s3a.S3AFileSystem (S3AFileSystem.java:listStatus(533)) - List status for path: s3a://redactedbucketname/ 2015-08-18 04:38:37,262 INFO [main] s3a.S3AFileSystem (S3AFileSystem.java:getFileStatus(684)) - Getting path status for s3a://redactedbucketname/ () java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: [922-212-4438]-[119]-[1]-[2015-08-13T15:43:12.346193%5D-%5B2015-01-01T00:00:00%5D-redacted.csv at org.apache.hadoop.fs.Path.initialize(Path.java:206) at org.apache.hadoop.fs.Path.init(Path.java:172) at org.apache.hadoop.fs.Path.init(Path.java:94) at org.apache.hadoop.fs.Globber.glob(Globber.java:240) at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1700) at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:229) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:200) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:279) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781) at org.apache.spark.rdd.RDD.count(RDD.scala:1099) at $iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC.init(console:24) at $iwC$iwC$iwC$iwC$iwC$iwC$iwC.init(console:29) at $iwC$iwC$iwC$iwC$iwC$iwC.init(console:31) at $iwC$iwC$iwC$iwC$iwC.init(console:33) at $iwC$iwC$iwC$iwC.init(console:35) at $iwC$iwC$iwC.init(console:37) at $iwC$iwC.init(console:39) at $iwC.init(console:41) at init(console:43) at .init(console:47) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) at
How to remove worker node but let it finish first?
Hi, I have a spark standalone cluster with 100s of applications per day, and it changes size (more or less workers) at various hours. The driver runs on a separate machine outside the spark cluster. When a job is running and it's worker is killed (because at that hour the number of workers is reduced), it sometimes fails, instead of redistributing the work to other workers. How is it possible to decomission a worker, so that it doesn't receive any new work, but does finish all existing work before shutting down? Thanks!
Re: How to overwrite partition when writing Parquet?
Cheng - what if I want to overwrite a specific partition? I'll to remove the folder, as Hemant suggested... On Thu, Aug 20, 2015 at 1:17 PM Cheng Lian lian.cs@gmail.com wrote: You can apply a filter first to filter out data of needed dates and then append them. Cheng On 8/20/15 4:59 PM, Hemant Bhanawat wrote: How can I overwrite only a given partition or manually remove a partition before writing? I don't know if (and I don't think) there is a way to do that using a mode. But doesn't manually deleting the directory of a particular partition help? For directory structure, check this out... http://spark.apache.org/docs/latest/sql-programming-guide.html#partition-discovery On Wed, Aug 19, 2015 at 8:18 PM, Romi Kuntsman r...@totango.com wrote: Hello, I have a DataFrame, with a date column which I want to use as a partition. Each day I want to write the data for the same date in Parquet, and then read a dataframe for a date range. I'm using: myDataframe.write().partitionBy(date).mode(SaveMode.Overwrite).parquet(parquetDir); If I use SaveMode.Append, then writing data for the same partition adds the same data there again. If I use SaveMode.Overwrite, then writing data for a single partition removes all the data for all partitions. How can I overwrite only a given partition or manually remove a partition before writing? Many thanks! Romi K.
[jira] [Created] (SPARK-10135) Percent of pruned partitions is shown wrong
Romi Kuntsman created SPARK-10135: - Summary: Percent of pruned partitions is shown wrong Key: SPARK-10135 URL: https://issues.apache.org/jira/browse/SPARK-10135 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.4.0 Reporter: Romi Kuntsman Priority: Trivial When reading partitioned Parquet in SparkSQL, an info message about the number of pruned partitions is displayed. Actual: Selected 15 partitions out of 181, pruned -1106.7% partitions. Expected: Selected 15 partitions out of 181, pruned 91.71270718232044% partitions. Fix: (i'm newbie here so please help make patch, thanks!) in DataSourceStrategy.scala in method apply() insted of: val percentPruned = (1 - total.toDouble / selected.toDouble) * 100 should be: val percentPruned = (1 - selected.toDouble / total.toDouble) * 100 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
How to overwrite partition when writing Parquet?
Hello, I have a DataFrame, with a date column which I want to use as a partition. Each day I want to write the data for the same date in Parquet, and then read a dataframe for a date range. I'm using: myDataframe.write().partitionBy(date).mode(SaveMode.Overwrite).parquet(parquetDir); If I use SaveMode.Append, then writing data for the same partition adds the same data there again. If I use SaveMode.Overwrite, then writing data for a single partition removes all the data for all partitions. How can I overwrite only a given partition or manually remove a partition before writing? Many thanks! Romi K.
Re: How to minimize shuffling on Spark dataframe Join?
If you create a PairRDD from the DataFrame, using dataFrame.toRDD().mapToPair(), then you can call partitionBy(someCustomPartitioner) which will partition the RDD by the key (of the pair). Then the operations on it (like joining with another RDD) will consider this partitioning. I'm not sure that DataFrames already support this. On Wed, Aug 12, 2015 at 11:16 AM Abdullah Anwar abdullah.ibn.an...@gmail.com wrote: Hi Hemant, Thank you for your replay. I think source of my dataframe is not partitioned on key, its an avro file where 'id' is a field .. but I don't know how to read a file and at the same time configure partition key. I couldn't find anything on SQLContext.read.load where you can set partition key. or in dataframe where you can set partition key. If it could partition the on the specified key .. will spark put the same partition range on same machine for two different dataframe?? What are the overall tips to join faster? Best Regards, Abdullah On Wed, Aug 12, 2015 at 11:02 AM, Hemant Bhanawat hemant9...@gmail.com wrote: Is the source of your dataframe partitioned on key? As per your mail, it looks like it is not. If that is the case, for partitioning the data, you will have to shuffle the data anyway. Another part of your question is - how to co-group data from two dataframes based on a key? I think for RDD's cogroup in PairRDDFunctions is a way. I am not sure if something similar is available for DataFrames. Hemant On Tue, Aug 11, 2015 at 2:14 PM, Abdullah Anwar abdullah.ibn.an...@gmail.com wrote: I have two dataframes like this student_rdf = (studentid, name, ...) student_result_rdf = (studentid, gpa, ...) we need to join this two dataframes. we are now doing like this, student_rdf.join(student_result_rdf, student_result_rdf[studentid] == student_rdf[studentid]) So it is simple. But it creates lots of data shuffling across worker nodes, but as joining key is similar and if the dataframe could (understand the partitionkey) be partitioned using that key (studentid) then there suppose not to be any shuffling at all. As similar data (based on partition key) would reside in similar node. is it possible, to hint spark to do this? So, I am finding the way to partition data based on a column while I read a dataframe from input. And If it is possible that Spark would understand that two partitionkey of two dataframes are similar, then how? -- Abdullah -- Abdullah
Re: Issues with S3 paths that contain colons
I had the exact same issue, and overcame it by overriding NativeS3FileSystem with my own class, where I replaced the implementation of globStatus. It's a hack but it works. Then I set the hadoop config fs.myschema.impl to my class name, and accessed the files through myschema:// instead of s3n:// @Override public FileStatus[] globStatus(final Path pathPattern, final PathFilter filter) throws IOException { final FileStatus[] statusList = super.listStatus(pathPattern); final ListFileStatus result = Lists.newLinkedList(); for (FileStatus fileStatus : statusList) { if (filter.accept(fileStatus.getPath())) { result.add(fileStatus); } } return result.toArray(new FileStatus[] {}); } On Wed, Aug 19, 2015 at 9:14 PM Steve Loughran ste...@hortonworks.com wrote: you might want to think about filing a JIRA on issues.apache.org against HADOOP here, component being fs/s3. That doesn't mean it is fixable, only known. Every FS has its own set of forbidden characters filenames; unix doesn't files named .; windows doesn't allow files called COM1, ..., so hitting some filesystem rule is sometimes a problem. Here, though, you've got the file in S3, the listing finds it, but other bits of the codepath are failing -which implies that it is something in the Hadoop libs. On 18 Aug 2015, at 08:20, Brian Stempin brian.stem...@gmail.com wrote: Hi, I'm running Spark on Amazon EMR (Spark 1.4.1, Hadoop 2.6.0). I'm seeing the exception below when encountering file names that contain colons. Any idea on how to get around this? scala val files = sc.textFile(s3a://redactedbucketname/*) 2015-08-18 04:38:34,567 INFO [main] storage.MemoryStore (Logging.scala:logInfo(59)) - ensureFreeSpace(242224) called with curMem=669367, maxMem=285203496 2015-08-18 04:38:34,568 INFO [main] storage.MemoryStore (Logging.scala:logInfo(59)) - Block broadcast_3 stored as values in memory (estimated size 236.5 KB, free 271.1 MB) 2015-08-18 04:38:34,663 INFO [main] storage.MemoryStore (Logging.scala:logInfo(59)) - ensureFreeSpace(21533) called with curMem=911591, maxMem=285203496 2015-08-18 04:38:34,664 INFO [main] storage.MemoryStore (Logging.scala:logInfo(59)) - Block broadcast_3_piece0 stored as bytes in memory (estimated size 21.0 KB, free 271.1 MB) 2015-08-18 04:38:34,665 INFO [sparkDriver-akka.actor.default-dispatcher-19] storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Added broadcast_3_piece0 in memory on 10.182.184.26:60338 (size: 21.0 KB, free: 271.9 MB) 2015-08-18 04:38:34,667 INFO [main] spark.SparkContext (Logging.scala:logInfo(59)) - Created broadcast 3 from textFile at console:21 files: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at textFile at console:21 scala files.count 2015-08-18 04:38:37,262 INFO [main] s3a.S3AFileSystem (S3AFileSystem.java:listStatus(533)) - List status for path: s3a://redactedbucketname/ 2015-08-18 04:38:37,262 INFO [main] s3a.S3AFileSystem (S3AFileSystem.java:getFileStatus(684)) - Getting path status for s3a://redactedbucketname/ () java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: [922-212-4438]-[119]-[1]-[2015-08-13T15:43:12.346193%5D-%5B2015-01-01T00:00:00%5D-redacted.csv at org.apache.hadoop.fs.Path.initialize(Path.java:206) at org.apache.hadoop.fs.Path.init(Path.java:172) at org.apache.hadoop.fs.Path.init(Path.java:94) at org.apache.hadoop.fs.Globber.glob(Globber.java:240) at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1700) at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:229) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:200) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:279) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781) at org.apache.spark.rdd.RDD.count(RDD.scala:1099) at $iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC.init(console:24) at $iwC$iwC$iwC$iwC$iwC$iwC$iwC.init(console:29) at $iwC$iwC$iwC$iwC$iwC$iwC.init(console:31) at $iwC$iwC$iwC$iwC$iwC.init(console:33) at $iwC$iwC$iwC$iwC.init(console:35) at $iwC$iwC$iwC.init(console:37) at
Re: spark as a lookup engine for dedup
RDD is immutable, it cannot be changed, you can only create a new one from data or from transformation. It sounds inefficient to create one each 15 sec for the last 24 hours. I think a key-value store will be much more fitted for this purpose. On Mon, Jul 27, 2015 at 11:21 AM Shushant Arora shushantaror...@gmail.com wrote: its for 1 day events in range of 1 billions and processing is in streaming application of ~10-15 sec interval so lookup should be fast. RDD need to be updated with new events and old events of current time-24 hours back should be removed at each processing. So is spark RDD not fit for this requirement? On Mon, Jul 27, 2015 at 1:08 PM, Romi Kuntsman r...@totango.com wrote: What the throughput of processing and for how long do you need to remember duplicates? You can take all the events, put them in an RDD, group by the key, and then process each key only once. But if you have a long running application where you want to check that you didn't see the same value before, and check that for every value, you probably need a key-value store, not RDD. On Sun, Jul 26, 2015 at 7:38 PM Shushant Arora shushantaror...@gmail.com wrote: Hi I have a requirement for processing large events but ignoring duplicate at the same time. Events are consumed from kafka and each event has a eventid. It may happen that an event is already processed and came again at some other offset. 1.Can I use Spark RDD to persist processed events and then lookup with this rdd (How to do lookup inside a RDD ?I have a JavaPairRDDeventid,timestamp ) while processing new events and if event is present in persisted rdd ignore it , else process the even. Does rdd.lookup(key) on billion of events will be efficient ? 2. update the rdd (Since RDD is immutable how to update it)? Thanks
Re: spark as a lookup engine for dedup
What the throughput of processing and for how long do you need to remember duplicates? You can take all the events, put them in an RDD, group by the key, and then process each key only once. But if you have a long running application where you want to check that you didn't see the same value before, and check that for every value, you probably need a key-value store, not RDD. On Sun, Jul 26, 2015 at 7:38 PM Shushant Arora shushantaror...@gmail.com wrote: Hi I have a requirement for processing large events but ignoring duplicate at the same time. Events are consumed from kafka and each event has a eventid. It may happen that an event is already processed and came again at some other offset. 1.Can I use Spark RDD to persist processed events and then lookup with this rdd (How to do lookup inside a RDD ?I have a JavaPairRDDeventid,timestamp ) while processing new events and if event is present in persisted rdd ignore it , else process the even. Does rdd.lookup(key) on billion of events will be efficient ? 2. update the rdd (Since RDD is immutable how to update it)? Thanks
Re: Scaling spark cluster for a running application
Are you running the Spark cluster in standalone or YARN? In standalone, the application gets the available resources when it starts. With YARN, you can try to turn on the setting *spark.dynamicAllocation.enabled* See https://spark.apache.org/docs/latest/configuration.html On Wed, Jul 22, 2015 at 2:20 PM phagunbaya phagun.b...@falkonry.com wrote: I have a spark cluster running in client mode with driver outside the spark cluster. I want to scale the cluster after an application is submitted. In order to do this, I'm creating new workers and they are getting registered with master but issue I'm seeing is; running application does not use the newly added worker. Hence cannot add more resources to existing running application. Is there any other way or config to deal with this use-case ? How to make running application to ask for executors from newly issued worker node ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Scaling-spark-cluster-for-a-running-application-tp23951.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Applications metrics unseparatable from Master metrics?
Hi, I tried to enable Master metrics source (to get number of running/waiting applications etc), and connected it to Graphite. However, when these are enabled, application metrics are also sent. Is it possible to separate them, and send only master metrics without applications? I see that Master class is registering both: https://github.com/apache/spark/blob/b9a922e260bec1b211437f020be37fab46a85db0/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L91 Thanks, RK.
Re: Timestamp functions for sqlContext
Hi Tal, I'm not sure there is currently a built-in function for it, but you can easily define a UDF (user defined function) by extending org.apache.spark.sql.api.java.UDF1, registering it (sparkContext.udf().register(...)), and then use it inside your query. RK. On Tue, Jul 21, 2015 at 7:04 PM Tal Rozen t...@scaleka.com wrote: Hi, I'm running a query with sql context where one of the fields is of type java.sql.Timestamp. I'd like to set a function similar to DATEDIFF in mysql, between the date given in each row, and now. So If I was able to use the same syntax as in mysql it would be: val date_diff_df = sqlContext.sql(select DATEDIFF(curdate(), rowTimestamp) date_diff from tableName) What are the relevant key words to replace curdate(), and DATEDIFF? Thanks
Spark Application stuck retrying task failed on Java heap space?
Hello, *TL;DR: task crashes with OOM, but application gets stuck in infinite loop retrying the task over and over again instead of failing fast.* Using Spark 1.4.0, standalone, with DataFrames on Java 7. I have an application that does some aggregations. I played around with shuffling settings, which led to the dreaded Java heap space error. See the stack trace at the end of this message. When this happens, I see 10's of executors in EXITED state, a couple in LOADING and one in RUNNING. All of them are retrying the same task all over again, and keep failing with the same Java heap space error. This goes on for hours! Why doesn't the whole application fail, when the individual executors keep failing with the same error? Thanks, Romi K. --- end of the log in a failed task: 15/07/21 11:13:40 INFO executor.Executor: Finished task 117.0 in stage 218.1 (TID 305). 2000 bytes result sent to driver 15/07/21 11:13:41 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 306 15/07/21 11:13:41 INFO executor.Executor: Running task 0.0 in stage 219.1 (TID 306) 15/07/21 11:13:41 INFO spark.MapOutputTrackerWorker: Updating epoch to 420 and clearing cache 15/07/21 11:13:41 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 8 15/07/21 11:13:41 INFO storage.MemoryStore: ensureFreeSpace(5463) called with curMem=285917, maxMem=1406164008 15/07/21 11:13:41 INFO storage.MemoryStore: Block broadcast_8_piece0 stored as bytes in memory (estimated size 5.3 KB, free 1340.7 MB) 15/07/21 11:13:41 INFO broadcast.TorrentBroadcast: Reading broadcast variable 8 took 22 ms 15/07/21 11:13:41 INFO storage.MemoryStore: ensureFreeSpace(10880) called with curMem=291380, maxMem=1406164008 15/07/21 11:13:41 INFO storage.MemoryStore: Block broadcast_8 stored as values in memory (estimated size 10.6 KB, free 1340.7 MB) 15/07/21 11:13:41 INFO spark.MapOutputTrackerWorker: Don't have map outputs for shuffle 136, fetching them 15/07/21 11:13:41 INFO spark.MapOutputTrackerWorker: Doing the fetch; tracker endpoint = AkkaRpcEndpointRef(Actor[akka.tcp:// sparkDriver@1.2.3.4:57490/user/MapOutputTracker#-99712578]) 15/07/21 11:13:41 INFO spark.MapOutputTrackerWorker: Got the output locations 15/07/21 11:13:41 INFO storage.ShuffleBlockFetcherIterator: Getting 182 non-empty blocks out of 182 blocks 15/07/21 11:13:41 INFO storage.ShuffleBlockFetcherIterator: Started 4 remote fetches in 28 ms 15/07/21 11:14:34 ERROR executor.Executor: Exception in task 0.0 in stage 219.1 (TID 306) java.lang.OutOfMemoryError: Java heap space at scala.collection.mutable.ResizableArray$class.ensureSize(ResizableArray.scala:99) at scala.collection.mutable.ArrayBuffer.ensureSize(ArrayBuffer.scala:47) at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:83) at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:47) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.Sort$$anonfun$doExecute$5$$anonfun$apply$5.apply(basicOperators.scala:192) at org.apache.spark.sql.execution.Sort$$anonfun$doExecute$5$$anonfun$apply$5.apply(basicOperators.scala:190) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at
Re: One corrupt gzip in a directory of 100s
What about communication errors and not corrupted files? Both when reading input and when writing output. We currently experience a failure of the entire process, if the last stage of writing the output (to Amazon S3) failed because of a very temporary DNS resolution issue (easily resolved by retrying). *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Wed, Apr 1, 2015 at 12:58 PM, Gil Vernik g...@il.ibm.com wrote: I actually saw the same issue, where we analyzed some container with few hundreds of GBs zip files - one was corrupted and Spark exit with Exception on the entire job. I like SPARK-6593, since it can cover also additional cases, not just in case of corrupted zip files. From: Dale Richardson dale...@hotmail.com To: dev@spark.apache.org dev@spark.apache.org Date: 29/03/2015 11:48 PM Subject:One corrupt gzip in a directory of 100s Recently had an incident reported to me where somebody was analysing a directory of gzipped log files, and was struggling to load them into spark because one of the files was corrupted - calling sc.textFiles('hdfs:///logs/*.gz') caused an IOException on the particular executor that was reading that file, which caused the entire job to be cancelled after the retry count was exceeded, without any way of catching and recovering from the error. While normally I think it is entirely appropriate to stop execution if something is wrong with your input, sometimes it is useful to analyse what you can get (as long as you are aware that input has been skipped), and treat corrupt files as acceptable losses. To cater for this particular case I've added SPARK-6593 (PR at https://github.com/apache/spark/pull/5250). Which adds an option (spark.hadoop.ignoreInputErrors) to log exceptions raised by the hadoop Input format, but to continue on with the next task. Ideally in this case you would want to report the corrupt file paths back to the master so they could be dealt with in a particular way (eg moved to a separate directory), but that would require a public API change/addition. I was pondering on an addition to Spark's hadoop API that could report processing status back to the master via an optional accumulator that collects filepath/Option(exception message) tuples so the user has some idea of what files are being processed, and what files are being skipped. Regards,Dale.
[jira] [Commented] (SPARK-2579) Reading from S3 returns an inconsistent number of items with Spark 0.9.1
[ https://issues.apache.org/jira/browse/SPARK-2579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14324285#comment-14324285 ] Romi Kuntsman commented on SPARK-2579: -- Does this still happen with Spark 1.2.1? Reading from S3 returns an inconsistent number of items with Spark 0.9.1 Key: SPARK-2579 URL: https://issues.apache.org/jira/browse/SPARK-2579 Project: Spark Issue Type: Bug Components: Input/Output Affects Versions: 0.9.1 Reporter: Eemil Lagerspetz Priority: Critical Labels: hdfs, read, s3, skipping I have created a random matrix of 1M rows with 10K items on each row, semicolon-separated. While reading it with Spark 0.9.1 and doing a count, I consistently get less than 1M rows, and a different number every time at that ( !! ). Example below: head -n 1 tool-generate-random-matrix*log == tool-generate-random-matrix-999158.log == Row item counts: 999158 == tool-generate-random-matrix.log == Row item counts: 997163 The data is split into 1000 partitions. When I download it using s3cmd sync, and run the following AWK on it, I get the correct number of rows in each partition (1000x1000 = 1M). What is up? {code:title=checkrows.sh|borderStyle=solid} for k in part-0* do echo $k awk -F ; ' NF != 1 { print Wrong number of items:,NF } END { if (NR != 1000) { print Wrong number of rows:,NR } }' $k done {code} The matrix generation and counting code is below: {code:title=Matrix.scala|borderStyle=solid} package fi.helsinki.cs.nodes.matrix import java.util.Random import org.apache.spark._ import org.apache.spark.SparkContext._ import scala.collection.mutable.ListBuffer import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel._ object GenerateRandomMatrix { def NewGeMatrix(rSeed: Int, rdd: RDD[Int], features: Int) = { rdd.mapPartitions(part = part.map(xarr = { val rdm = new Random(rSeed + xarr) val arr = new Array[Double](features) for (i - 0 until features) arr(i) = rdm.nextDouble() new Row(xarr, arr) })) } case class Row(id: Int, elements: Array[Double]) {} def rowFromText(line: String) = { val idarr = line.split( ) val arr = idarr(1).split(;) // -1 to fix saved matrix indexing error new Row(idarr(0).toInt-1, arr.map(_.toDouble)) } def main(args: Array[String]) { val master = args(0) val tasks = args(1).toInt val savePath = args(2) val read = args.contains(read) val datapoints = 100 val features = 1 val sc = new SparkContext(master, RandomMatrix) if (read) { val randomMatrix: RDD[Row] = sc.textFile(savePath, tasks).map(rowFromText).persist(MEMORY_AND_DISK) println(Row item counts: + randomMatrix.count) } else { val rdd = sc.parallelize(0 until datapoints, tasks) val bcSeed = sc.broadcast(128) /* Generating a matrix of random Doubles */ val randomMatrix = NewGeMatrix(bcSeed.value, rdd, features).persist(MEMORY_AND_DISK) randomMatrix.map(row = row.id + + row.elements.mkString(;)).saveAsTextFile(savePath) } sc.stop } } {code} I run this with: appassembler/bin/tool-generate-random-matrix master 1000 s3n://keys@path/to/data 1matrix.log 2matrix.err Reading from HDFS gives the right count and right number of items on each row. However, I had to run with the full path with the server name, just /matrix does not work (it thinks I want file://): p=hdfs://ec2-54-188-6-77.us-west-2.compute.amazonaws.com:9000/matrix appassembler/bin/tool-generate-random-matrix $( cat /root/spark-ec2/cluster-url ) 1000 $p read 1readmatrix.log 2readmatrix.err -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4879) Missing output partitions after job completes with speculative execution
[ https://issues.apache.org/jira/browse/SPARK-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14324278#comment-14324278 ] Romi Kuntsman commented on SPARK-4879: -- Could this happen very very rarely when not using speculative execution? Once in a long while, I have a situation where the OutputCommitter says it wrote the file successfully, but the output file doesn't appear there. Missing output partitions after job completes with speculative execution Key: SPARK-4879 URL: https://issues.apache.org/jira/browse/SPARK-4879 Project: Spark Issue Type: Bug Components: Input/Output, Spark Core Affects Versions: 1.0.2, 1.1.1, 1.2.0 Reporter: Josh Rosen Assignee: Josh Rosen Priority: Critical Labels: backport-needed Fix For: 1.3.0 Attachments: speculation.txt, speculation2.txt When speculative execution is enabled ({{spark.speculation=true}}), jobs that save output files may report that they have completed successfully even though some output partitions written by speculative tasks may be missing. h3. Reproduction This symptom was reported to me by a Spark user and I've been doing my own investigation to try to come up with an in-house reproduction. I'm still working on a reliable local reproduction for this issue, which is a little tricky because Spark won't schedule speculated tasks on the same host as the original task, so you need an actual (or containerized) multi-host cluster to test speculation. Here's a simple reproduction of some of the symptoms on EC2, which can be run in {{spark-shell}} with {{--conf spark.speculation=true}}: {code} // Rig a job such that all but one of the tasks complete instantly // and one task runs for 20 seconds on its first attempt and instantly // on its second attempt: val numTasks = 100 sc.parallelize(1 to numTasks, numTasks).repartition(2).mapPartitionsWithContext { case (ctx, iter) = if (ctx.partitionId == 0) { // If this is the one task that should run really slow if (ctx.attemptId == 0) { // If this is the first attempt, run slow Thread.sleep(20 * 1000) } } iter }.map(x = (x, x)).saveAsTextFile(/test4) {code} When I run this, I end up with a job that completes quickly (due to speculation) but reports failures from the speculated task: {code} [...] 14/12/11 01:41:13 INFO scheduler.TaskSetManager: Finished task 37.1 in stage 3.0 (TID 411) in 131 ms on ip-172-31-8-164.us-west-2.compute.internal (100/100) 14/12/11 01:41:13 INFO scheduler.DAGScheduler: Stage 3 (saveAsTextFile at console:22) finished in 0.856 s 14/12/11 01:41:13 INFO spark.SparkContext: Job finished: saveAsTextFile at console:22, took 0.885438374 s 14/12/11 01:41:13 INFO scheduler.TaskSetManager: Ignoring task-finished event for 70.1 in stage 3.0 because task 70 has already completed successfully scala 14/12/11 01:41:13 WARN scheduler.TaskSetManager: Lost task 49.1 in stage 3.0 (TID 413, ip-172-31-8-164.us-west-2.compute.internal): java.io.IOException: Failed to save output of task: attempt_201412110141_0003_m_49_413 org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:160) org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:172) org.apache.hadoop.mapred.FileOutputCommitter.commitTask(FileOutputCommitter.java:132) org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:109) org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:991) org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) {code} One interesting thing to note about this stack trace: if we look at {{FileOutputCommitter.java:160}} ([link|http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/org.apache.hadoop/hadoop-core/2.5.0-mr1-cdh5.2.0/org/apache/hadoop/mapred/FileOutputCommitter.java#160]), this point in the execution seems to correspond to a case where a task completes, attempts to commit its output, fails for some reason, then deletes the destination file, tries again, and fails: {code} if (fs.isFile(taskOutput)) { 152 Path
Re: Guava 11 dependency issue in Spark 1.2.0
I have recently encountered a similar problem with Guava version collision with Hadoop. Isn't it more correct to upgrade Hadoop to use the latest Guava? Why are they staying in version 11, does anyone know? *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Wed, Jan 7, 2015 at 7:59 AM, Niranda Perera niranda.per...@gmail.com wrote: Hi Sean, I removed the hadoop dependencies from the app and ran it on the cluster. It gives a java.io.EOFException 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(177166) called with curMem=0, maxMem=2004174766 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 173.0 KB, free 1911.2 MB) 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(25502) called with curMem=177166, maxMem=2004174766 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 24.9 KB, free 1911.1 MB) 15/01/07 11:19:29 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.100.5.109:43924 (size: 24.9 KB, free: 1911.3 MB) 15/01/07 11:19:29 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/01/07 11:19:29 INFO SparkContext: Created broadcast 0 from hadoopFile at AvroRelation.scala:45 15/01/07 11:19:29 INFO FileInputFormat: Total input paths to process : 1 15/01/07 11:19:29 INFO SparkContext: Starting job: collect at SparkPlan.scala:84 15/01/07 11:19:29 INFO DAGScheduler: Got job 0 (collect at SparkPlan.scala:84) with 2 output partitions (allowLocal=false) 15/01/07 11:19:29 INFO DAGScheduler: Final stage: Stage 0(collect at SparkPlan.scala:84) 15/01/07 11:19:29 INFO DAGScheduler: Parents of final stage: List() 15/01/07 11:19:29 INFO DAGScheduler: Missing parents: List() 15/01/07 11:19:29 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[6] at map at SparkPlan.scala:84), which has no missing parents 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(4864) called with curMem=202668, maxMem=2004174766 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.8 KB, free 1911.1 MB) 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(3481) called with curMem=207532, maxMem=2004174766 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 3.4 KB, free 1911.1 MB) 15/01/07 11:19:29 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.100.5.109:43924 (size: 3.4 KB, free: 1911.3 MB) 15/01/07 11:19:29 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0 15/01/07 11:19:29 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:838 15/01/07 11:19:29 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (MappedRDD[6] at map at SparkPlan.scala:84) 15/01/07 11:19:29 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 15/01/07 11:19:29 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 10.100.5.109, PROCESS_LOCAL, 1340 bytes) 15/01/07 11:19:29 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 10.100.5.109, PROCESS_LOCAL, 1340 bytes) 15/01/07 11:19:29 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, 10.100.5.109): java.io.EOFException at java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2722) at java.io.ObjectInputStream.readFully(ObjectInputStream.java:1009) at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63) at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101) at org.apache.hadoop.io.UTF8.readChars(UTF8.java:216) at org.apache.hadoop.io.UTF8.readString(UTF8.java:208) at org.apache.hadoop.mapred.FileSplit.readFields(FileSplit.java:87) at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:237) at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:66) at org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1871) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1775) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1327) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1969) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893
Re: Spark client reconnect to driver in yarn-cluster deployment mode
in yarn-client mode it only controls the environment of the executor launcher So you either use yarn-client mode, and then your app keeps running and controlling the process Or you use yarn-cluster mode, and then you send a jar to YARN, and that jar should have code to report the result back to you *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Thu, Jan 15, 2015 at 1:52 PM, preeze etan...@gmail.com wrote: From the official spark documentation (http://spark.apache.org/docs/1.2.0/running-on-yarn.html): In yarn-cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application. Is there any designed way that the client connects back to the driver (still running in YARN) for collecting results at a later stage? -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-client-reconnect-to-driver-in-yarn-cluster-deployment-mode-tp10122.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Guava 11 dependency issue in Spark 1.2.0
Actually there is already someone on Hadoop-Common-Dev taking care of removing the old Guava dependency http://mail-archives.apache.org/mod_mbox/hadoop-common-dev/201501.mbox/browser https://issues.apache.org/jira/browse/HADOOP-11470 *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Mon, Jan 19, 2015 at 4:03 PM, Romi Kuntsman r...@totango.com wrote: I have recently encountered a similar problem with Guava version collision with Hadoop. Isn't it more correct to upgrade Hadoop to use the latest Guava? Why are they staying in version 11, does anyone know? *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Wed, Jan 7, 2015 at 7:59 AM, Niranda Perera niranda.per...@gmail.com wrote: Hi Sean, I removed the hadoop dependencies from the app and ran it on the cluster. It gives a java.io.EOFException 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(177166) called with curMem=0, maxMem=2004174766 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 173.0 KB, free 1911.2 MB) 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(25502) called with curMem=177166, maxMem=2004174766 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 24.9 KB, free 1911.1 MB) 15/01/07 11:19:29 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.100.5.109:43924 (size: 24.9 KB, free: 1911.3 MB) 15/01/07 11:19:29 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/01/07 11:19:29 INFO SparkContext: Created broadcast 0 from hadoopFile at AvroRelation.scala:45 15/01/07 11:19:29 INFO FileInputFormat: Total input paths to process : 1 15/01/07 11:19:29 INFO SparkContext: Starting job: collect at SparkPlan.scala:84 15/01/07 11:19:29 INFO DAGScheduler: Got job 0 (collect at SparkPlan.scala:84) with 2 output partitions (allowLocal=false) 15/01/07 11:19:29 INFO DAGScheduler: Final stage: Stage 0(collect at SparkPlan.scala:84) 15/01/07 11:19:29 INFO DAGScheduler: Parents of final stage: List() 15/01/07 11:19:29 INFO DAGScheduler: Missing parents: List() 15/01/07 11:19:29 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[6] at map at SparkPlan.scala:84), which has no missing parents 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(4864) called with curMem=202668, maxMem=2004174766 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.8 KB, free 1911.1 MB) 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(3481) called with curMem=207532, maxMem=2004174766 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 3.4 KB, free 1911.1 MB) 15/01/07 11:19:29 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.100.5.109:43924 (size: 3.4 KB, free: 1911.3 MB) 15/01/07 11:19:29 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0 15/01/07 11:19:29 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:838 15/01/07 11:19:29 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (MappedRDD[6] at map at SparkPlan.scala:84) 15/01/07 11:19:29 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 15/01/07 11:19:29 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 10.100.5.109, PROCESS_LOCAL, 1340 bytes) 15/01/07 11:19:29 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 10.100.5.109, PROCESS_LOCAL, 1340 bytes) 15/01/07 11:19:29 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, 10.100.5.109): java.io.EOFException at java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2722) at java.io.ObjectInputStream.readFully(ObjectInputStream.java:1009) at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63) at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101) at org.apache.hadoop.io.UTF8.readChars(UTF8.java:216) at org.apache.hadoop.io.UTF8.readString(UTF8.java:208) at org.apache.hadoop.mapred.FileSplit.readFields(FileSplit.java:87) at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:237) at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:66) at org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969) at java.io.ObjectInputStream.readSerialData
Re: Announcing Spark 1.1.1!
About version compatibility and upgrade path - can the Java application dependencies and the Spark server be upgraded separately (i.e. will 1.1.0 library work with 1.1.1 server, and vice versa), or do they need to be upgraded together? Thanks! *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Tue, Dec 2, 2014 at 11:36 PM, Andrew Or and...@databricks.com wrote: I am happy to announce the availability of Spark 1.1.1! This is a maintenance release with many bug fixes, most of which are concentrated in the core. This list includes various fixes to sort-based shuffle, memory leak, and spilling issues. Contributions from this release came from 55 developers. Visit the release notes [1] to read about the new features, or download [2] the release today. [1] http://spark.apache.org/releases/spark-release-1-1-1.html [2] http://spark.apache.org/downloads.html Please e-mail me directly for any typo's in the release notes or name listing. Thanks for everyone who contributed, and congratulations! -Andrew
ExternalAppendOnlyMap: Thread spilling in-memory map of to disk many times slowly
Hello, I have a large data calculation in Spark, distributed across serveral nodes. In the end, I want to write to a single output file. For this I do: output.coalesce(1, false).saveAsTextFile(filename). What happens is all the data from the workers flows to a single worker, and that one writes the data. If the data is small enough, it all goes well. However, for a RDD from a certain size, I get a lot of the following messages (see below). From what I understand, ExternalAppendOnlyMap spills the data to disk when it can't hold it in memory. Is there a way to tell it to stream the data right to disk, instead of spilling each block slowly? 14/11/24 12:54:59 INFO MapOutputTrackerWorker: Got the output locations 14/11/24 12:54:59 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/11/24 12:54:59 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 69 non-empty blocks out of 90 blocks 14/11/24 12:54:59 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 3 remote fetches in 22 ms 14/11/24 12:55:11 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/11/24 12:55:11 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 70 non-empty blocks out of 90 blocks 14/11/24 12:55:11 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 3 remote fetches in 4 ms 14/11/24 12:55:11 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 13 MB to disk (1 time so far) 14/11/24 12:55:11 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 12 MB to disk (2 times so far) 14/11/24 12:55:11 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 12 MB to disk (3 times so far) [...trimmed...] 14/11/24 13:13:28 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/11/24 13:13:28 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 69 non-empty blocks out of 90 blocks 14/11/24 13:13:28 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 3 remote fetches in 2 ms 14/11/24 13:13:28 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 15 MB to disk (1 time so far) 14/11/24 13:13:28 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 16 MB to disk (2 times so far) 14/11/24 13:13:28 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 14 MB to disk (3 times so far) [...trimmed...] 14/11/24 13:13:32 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 13 MB to disk (33 times so far) 14/11/24 13:13:32 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 13 MB to disk (34 times so far) 14/11/24 13:13:32 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 13 MB to disk (35 times so far) 14/11/24 13:13:40 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/11/24 13:13:40 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 69 non-empty blocks out of 90 blocks 14/11/24 13:13:40 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 3 remote fetches in 4 ms 14/11/24 13:13:40 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 10 MB to disk (1 time so far) 14/11/24 13:13:41 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 10 MB to disk (2 times so far) 14/11/24 13:13:41 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 9 MB to disk (3 times so far) [...trimmed...] 14/11/24 13:13:45 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 12 MB to disk (36 times so far) 14/11/24 13:13:45 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 11 MB to disk (37 times so far) 14/11/24 13:13:56 INFO FileOutputCommitter: Saved output of task 'attempt_201411241250__m_00_90' to s3n://mybucket/mydir/output *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com
ExternalAppendOnlyMap: Thread spilling in-memory map of to disk many times slowly
Hello, I have a large data calculation in Spark, distributed across serveral nodes. In the end, I want to write to a single output file. For this I do: output.coalesce(1, false).saveAsTextFile(filename). What happens is all the data from the workers flows to a single worker, and that one writes the data. If the data is small enough, it all goes well. However, for a RDD from a certain size, I get a lot of the following messages (see below). From what I understand, ExternalAppendOnlyMap spills the data to disk when it can't hold it in memory. Is there a way to tell it to stream the data right to disk, instead of spilling each block slowly? 14/11/24 12:54:59 INFO MapOutputTrackerWorker: Got the output locations 14/11/24 12:54:59 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/11/24 12:54:59 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 69 non-empty blocks out of 90 blocks 14/11/24 12:54:59 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 3 remote fetches in 22 ms 14/11/24 12:55:11 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/11/24 12:55:11 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 70 non-empty blocks out of 90 blocks 14/11/24 12:55:11 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 3 remote fetches in 4 ms 14/11/24 12:55:11 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 13 MB to disk (1 time so far) 14/11/24 12:55:11 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 12 MB to disk (2 times so far) 14/11/24 12:55:11 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 12 MB to disk (3 times so far) [...trimmed...] 14/11/24 13:13:28 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/11/24 13:13:28 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 69 non-empty blocks out of 90 blocks 14/11/24 13:13:28 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 3 remote fetches in 2 ms 14/11/24 13:13:28 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 15 MB to disk (1 time so far) 14/11/24 13:13:28 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 16 MB to disk (2 times so far) 14/11/24 13:13:28 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 14 MB to disk (3 times so far) [...trimmed...] 14/11/24 13:13:32 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 13 MB to disk (33 times so far) 14/11/24 13:13:32 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 13 MB to disk (34 times so far) 14/11/24 13:13:32 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 13 MB to disk (35 times so far) 14/11/24 13:13:40 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/11/24 13:13:40 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 69 non-empty blocks out of 90 blocks 14/11/24 13:13:40 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 3 remote fetches in 4 ms 14/11/24 13:13:40 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 10 MB to disk (1 time so far) 14/11/24 13:13:41 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 10 MB to disk (2 times so far) 14/11/24 13:13:41 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 9 MB to disk (3 times so far) [...trimmed...] 14/11/24 13:13:45 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 12 MB to disk (36 times so far) 14/11/24 13:13:45 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory map of 11 MB to disk (37 times so far) 14/11/24 13:13:56 INFO FileOutputCommitter: Saved output of task 'attempt_201411241250__m_00_90' to s3n://mybucket/mydir/output *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com
[jira] [Commented] (SPARK-2867) saveAsHadoopFile() in PairRDDFunction.scala should allow use other OutputCommiter class
[ https://issues.apache.org/jira/browse/SPARK-2867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14207997#comment-14207997 ] Romi Kuntsman commented on SPARK-2867: -- In the latest code, it seems to be resolved // Use configured output committer if already set if (conf.getOutputCommitter == null) { hadoopConf.setOutputCommitter(classOf[FileOutputCommitter]) } https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L934 saveAsHadoopFile() in PairRDDFunction.scala should allow use other OutputCommiter class --- Key: SPARK-2867 URL: https://issues.apache.org/jira/browse/SPARK-2867 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.0, 1.1.0 Reporter: Joseph Su Priority: Minor The saveAsHadoopFile() in PairRDDFunction.scala hard-coded the OutputCommitter class as FileOutputCommitter because of the following code in the source: hadoopConf.setOutputCommitter(classOf[FileOutputCommitter]) However, OutputCommitter is a changeable option in regular Hadoop MapReduce program. Users can specify mapred.output.committer.class to change the committer class used by other Hadoop programs. The saveAsHadoopFile() function should remove this hard-coded assignment and provide a way to specify the OutputCommitte used here. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
Re: Spark job resource allocation best practices
How can I configure Mesos allocation policy to share resources between all current Spark applications? I can't seem to find it in the architecture docs. *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Tue, Nov 4, 2014 at 9:11 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Yes. i believe Mesos is the right choice for you. http://mesos.apache.org/documentation/latest/mesos-architecture/ Thanks Best Regards On Mon, Nov 3, 2014 at 9:33 PM, Romi Kuntsman r...@totango.com wrote: So, as said there, static partitioning is used in Spark’s standalone and YARN modes, as well as the coarse-grained Mesos mode. That leaves us only with Mesos, where there is *dynamic sharing* of CPU cores. It says when the application is not running tasks on a machine, other applications may run tasks on those cores. But my applications are short lived (seconds to minutes), and they read a large dataset, process it, and write the results. They are also IO-bound, meaning most of the time is spent reading input data (from S3) and writing the results back. Is it possible to divide the resources between them, according to how many are trying to run at the same time? So for example if I have 12 cores - if one job is scheduled, it will get 12 cores, but if 3 are scheduled, then each one will get 4 cores and then will all start. Thanks! *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Mon, Nov 3, 2014 at 5:46 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Have a look at scheduling pools https://spark.apache.org/docs/latest/job-scheduling.html. If you want more sophisticated resource allocation, then you are better of to use cluster managers like mesos or yarn Thanks Best Regards On Mon, Nov 3, 2014 at 9:10 PM, Romi Kuntsman r...@totango.com wrote: Hello, I have a Spark 1.1.0 standalone cluster, with several nodes, and several jobs (applications) being scheduled at the same time. By default, each Spark job takes up all available CPUs. This way, when more than one job is scheduled, all but the first are stuck in WAITING. On the other hand, if I tell each job to initially limit itself to a fixed number of CPUs, and that job runs by itself, the cluster is under-utilized and the job runs longer than it could have if it took all the available resources. - How to give the tasks a more fair resource division, which lets many jobs run together, and together lets them use all the available resources? - How do you divide resources between applications on your usecase? P.S. I started reading about Mesos but couldn't figure out if/how it could solve the described issue. Thanks! *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com
Re: Spark job resource allocation best practices
I have a single Spark cluster, not multiple frameworks and not multiple versions. Is it relevant for my use-case? Where can I find information about exactly how to make Mesos tell Spark how many resources of the cluster to use? (instead of the default take-all) *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Tue, Nov 4, 2014 at 11:00 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You can look at different modes over here http://docs.sigmoidanalytics.com/index.php/Spark_On_Mesos#Mesos_Run_Modes These people has very good tutorial to get you started http://mesosphere.com/docs/tutorials/run-spark-on-mesos/#overview Thanks Best Regards On Tue, Nov 4, 2014 at 1:44 PM, Romi Kuntsman r...@totango.com wrote: How can I configure Mesos allocation policy to share resources between all current Spark applications? I can't seem to find it in the architecture docs. *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Tue, Nov 4, 2014 at 9:11 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Yes. i believe Mesos is the right choice for you. http://mesos.apache.org/documentation/latest/mesos-architecture/ Thanks Best Regards On Mon, Nov 3, 2014 at 9:33 PM, Romi Kuntsman r...@totango.com wrote: So, as said there, static partitioning is used in Spark’s standalone and YARN modes, as well as the coarse-grained Mesos mode. That leaves us only with Mesos, where there is *dynamic sharing* of CPU cores. It says when the application is not running tasks on a machine, other applications may run tasks on those cores. But my applications are short lived (seconds to minutes), and they read a large dataset, process it, and write the results. They are also IO-bound, meaning most of the time is spent reading input data (from S3) and writing the results back. Is it possible to divide the resources between them, according to how many are trying to run at the same time? So for example if I have 12 cores - if one job is scheduled, it will get 12 cores, but if 3 are scheduled, then each one will get 4 cores and then will all start. Thanks! *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Mon, Nov 3, 2014 at 5:46 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Have a look at scheduling pools https://spark.apache.org/docs/latest/job-scheduling.html. If you want more sophisticated resource allocation, then you are better of to use cluster managers like mesos or yarn Thanks Best Regards On Mon, Nov 3, 2014 at 9:10 PM, Romi Kuntsman r...@totango.com wrote: Hello, I have a Spark 1.1.0 standalone cluster, with several nodes, and several jobs (applications) being scheduled at the same time. By default, each Spark job takes up all available CPUs. This way, when more than one job is scheduled, all but the first are stuck in WAITING. On the other hand, if I tell each job to initially limit itself to a fixed number of CPUs, and that job runs by itself, the cluster is under-utilized and the job runs longer than it could have if it took all the available resources. - How to give the tasks a more fair resource division, which lets many jobs run together, and together lets them use all the available resources? - How do you divide resources between applications on your usecase? P.S. I started reading about Mesos but couldn't figure out if/how it could solve the described issue. Thanks! *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com
Re: Spark job resource allocation best practices
Let's say that I run Spark on Mesos in fine-grained mode, and I have 12 cores and 64GB memory. I run application A on Spark, and some time after that (but before A finished) application B. How many CPUs will each of them get? *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Tue, Nov 4, 2014 at 11:33 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You need to install mesos on your cluster. Then you will run your spark applications by specifying mesos master (mesos://) instead of (spark://). Spark can run over Mesos in two modes: “*fine-grained*” (default) and “ *coarse-grained*”. In “*fine-grained*” mode (default), each Spark task runs as a separate Mesos task. This allows multiple instances of Spark (and other frameworks) to share machines at a very fine granularity, where each application gets more or fewer machines as it ramps up and down, but it comes with an additional overhead in launching each task. This mode may be inappropriate for low-latency requirements like interactive queries or serving web requests. The “*coarse-grained*” mode will instead launch only one long-running Spark task on each Mesos machine, and dynamically schedule its own “mini-tasks” within it. The benefit is much lower startup overhead, but at the cost of reserving the Mesos resources for the complete duration of the application. To run in coarse-grained mode, set the spark.mesos.coarse property in your SparkConf: conf.set(spark.mesos.coarse, true) In addition, for coarse-grained mode, you can control the maximum number of resources Spark will acquire. By default, it will acquire all cores in the cluster (that get offered by Mesos), which only makes sense if you run just one application at a time. You can cap the maximum number of cores using conf.set(spark.cores.max, 10) (for example). If you run your application in fine-grained mode, then mesos will take care of the resource allocation for you. You just tell mesos from your application that you are running in fine-grained mode, and it is the default mode. Thanks Best Regards On Tue, Nov 4, 2014 at 2:46 PM, Romi Kuntsman r...@totango.com wrote: I have a single Spark cluster, not multiple frameworks and not multiple versions. Is it relevant for my use-case? Where can I find information about exactly how to make Mesos tell Spark how many resources of the cluster to use? (instead of the default take-all) *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Tue, Nov 4, 2014 at 11:00 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You can look at different modes over here http://docs.sigmoidanalytics.com/index.php/Spark_On_Mesos#Mesos_Run_Modes These people has very good tutorial to get you started http://mesosphere.com/docs/tutorials/run-spark-on-mesos/#overview Thanks Best Regards On Tue, Nov 4, 2014 at 1:44 PM, Romi Kuntsman r...@totango.com wrote: How can I configure Mesos allocation policy to share resources between all current Spark applications? I can't seem to find it in the architecture docs. *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Tue, Nov 4, 2014 at 9:11 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Yes. i believe Mesos is the right choice for you. http://mesos.apache.org/documentation/latest/mesos-architecture/ Thanks Best Regards On Mon, Nov 3, 2014 at 9:33 PM, Romi Kuntsman r...@totango.com wrote: So, as said there, static partitioning is used in Spark’s standalone and YARN modes, as well as the coarse-grained Mesos mode. That leaves us only with Mesos, where there is *dynamic sharing* of CPU cores. It says when the application is not running tasks on a machine, other applications may run tasks on those cores. But my applications are short lived (seconds to minutes), and they read a large dataset, process it, and write the results. They are also IO-bound, meaning most of the time is spent reading input data (from S3) and writing the results back. Is it possible to divide the resources between them, according to how many are trying to run at the same time? So for example if I have 12 cores - if one job is scheduled, it will get 12 cores, but if 3 are scheduled, then each one will get 4 cores and then will all start. Thanks! *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Mon, Nov 3, 2014 at 5:46 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Have a look at scheduling pools https://spark.apache.org/docs/latest/job-scheduling.html. If you want more sophisticated resource allocation, then you are better of to use cluster managers like mesos or yarn Thanks Best Regards On Mon, Nov 3, 2014 at 9:10 PM, Romi Kuntsman r...@totango.com wrote: Hello, I have a Spark 1.1.0 standalone cluster, with several nodes, and several jobs (applications) being scheduled at the same time. By default, each Spark job takes up all available CPUs. This way, when more than one job is scheduled, all
Spark job resource allocation best practices
Hello, I have a Spark 1.1.0 standalone cluster, with several nodes, and several jobs (applications) being scheduled at the same time. By default, each Spark job takes up all available CPUs. This way, when more than one job is scheduled, all but the first are stuck in WAITING. On the other hand, if I tell each job to initially limit itself to a fixed number of CPUs, and that job runs by itself, the cluster is under-utilized and the job runs longer than it could have if it took all the available resources. - How to give the tasks a more fair resource division, which lets many jobs run together, and together lets them use all the available resources? - How do you divide resources between applications on your usecase? P.S. I started reading about Mesos but couldn't figure out if/how it could solve the described issue. Thanks! *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com
Re: Spark job resource allocation best practices
So, as said there, static partitioning is used in Spark’s standalone and YARN modes, as well as the coarse-grained Mesos mode. That leaves us only with Mesos, where there is *dynamic sharing* of CPU cores. It says when the application is not running tasks on a machine, other applications may run tasks on those cores. But my applications are short lived (seconds to minutes), and they read a large dataset, process it, and write the results. They are also IO-bound, meaning most of the time is spent reading input data (from S3) and writing the results back. Is it possible to divide the resources between them, according to how many are trying to run at the same time? So for example if I have 12 cores - if one job is scheduled, it will get 12 cores, but if 3 are scheduled, then each one will get 4 cores and then will all start. Thanks! *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Mon, Nov 3, 2014 at 5:46 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Have a look at scheduling pools https://spark.apache.org/docs/latest/job-scheduling.html. If you want more sophisticated resource allocation, then you are better of to use cluster managers like mesos or yarn Thanks Best Regards On Mon, Nov 3, 2014 at 9:10 PM, Romi Kuntsman r...@totango.com wrote: Hello, I have a Spark 1.1.0 standalone cluster, with several nodes, and several jobs (applications) being scheduled at the same time. By default, each Spark job takes up all available CPUs. This way, when more than one job is scheduled, all but the first are stuck in WAITING. On the other hand, if I tell each job to initially limit itself to a fixed number of CPUs, and that job runs by itself, the cluster is under-utilized and the job runs longer than it could have if it took all the available resources. - How to give the tasks a more fair resource division, which lets many jobs run together, and together lets them use all the available resources? - How do you divide resources between applications on your usecase? P.S. I started reading about Mesos but couldn't figure out if/how it could solve the described issue. Thanks! *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com
Re: Dynamically switching Nr of allocated core
I didn't notice your message and asked about the same question, in the thread with the title Spark job resource allocation best practices. Adding specific case to your example: 1 - There are 12 cores available in the cluster 2 - I start app B with all cores - gets 12 3 - I start app A - it needs just 2 cores (as you said it will get even when there are 12 available), but gets nothing 4 - Until I stop app B, app A is stuck waiting, instead of app B freeing 2 cores and dropping to 10 cores. *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Mon, Nov 3, 2014 at 3:17 PM, RodrigoB rodrigo.boav...@aspect.com wrote: Hi all, I can't seem to find a clear answer on the documentation. Does the standalone cluster support dynamic assigment of nr of allocated cores to an application once another app stops? I'm aware that we can have core sharding if we use Mesos between active applications depending on the nr of parallel tasks I believe my question is slightly simpler. For example: 1 - There are 12 cores available in the cluster 2 - I start app A with 2 cores - gets 2 3 - I start app B - gets remaining 10 4 - If I stop app A, app B *does not* get the now available remaining 2 cores. Should I expect Mesos to have this scenario working? Also, the same question applies to when we add more cores to a cluster. Let's say ideally I want 12 cores for my app, although there are only 10. As I add more workers, they should get assigned to my app dynamically. I haven't tested this in a while but I think the app will not even start and complain about not enough resources... Would very much appreciate any knowledge share on this! tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Dynamically-switching-Nr-of-allocated-core-tp17955.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Workers disconnected from master sometimes and never reconnect back
Hi all, Regarding a post here a few months ago http://apache-spark-user-list.1001560.n3.nabble.com/Workers-disconnected-from-master-sometimes-and-never-reconnect-back-tp6240.html Is there an answer to this? I saw workers being still active and not reconnecting after they lost connection to the master. Using Spark 1.1.0. What if a master server is restarted, should worker retry to register on it? Greetings, -- *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com Join the Customer Success Manifesto http://youtu.be/XvFi2Wh6wgU
Re: [Swftools-common] Access Violation in swf_GetU8
Can anyone please answer about windows binaries? Thanks! On Wed, Oct 3, 2012 at 6:09 PM, imuserpol imuser...@gmail.com wrote: Is there a place to get the Windows binaries from the nightly builds / latest updates? -Original Message- From: swftools-common-bounces+imuserpol+swftools=gmail@nongnu.org [mailto:swftools-common-bounces+imuserpol+swftools=gmail@nongnu.org] On Behalf Of lists Sent: Tuesday, October 02, 2012 11:32 AM To: swftools-common@nongnu.org Cc: Romi Kuntsman Subject: Re: [Swftools-common] Access Violation in swf_GetU8 On Tue, 2 Oct 2012 15:23:04 +0200 Romi Kuntsman rmk...@gmail.com wrote: 1. There is a link to download.html from the main page, not to the wiki The main site came before the wiki. The wiki lists a few more download locations thatn the main site. I try to keep the wiki current, but time is short and other hands on the wheel always appreciated.. 2. In that page (Available_downloads) there's a link to download version 0.9.2, which is from April. Is there an updated build? The reason I quoted the wiki, is that you can check the current state of the main git repository, and the patches applied to date, with the wiki's ViewGIT: http://wiki.swftools.org/viewgit/ As you'll note, while Matthias has updated the git version with the changes ( so you could compile yourself ), the development releases pre-date the change your refer to. Are per chance after a Windows binary?? On Tue, Oct 2, 2012 at 2:40 PM, Lists li...@qimalta.com wrote: On Tue, 2 Oct 2012 14:32:07 +0200 Romi Kuntsman rmk...@gmail.com wrote: When will a build with those fixes be available online? http://www.swftools.org/download.html All the download links you need are to be found here, http://wiki.swftools.org/wiki/Main_Page#Available_downloads --- SWFTools-common is a self-managed list. To subscribe/unsubscribe, or amend an existing subscription, please kindly point your favourite web browser at:http://lists.nongnu.org/mailman/listinfo/swftools-common --- SWFTools-common is a self-managed list. To subscribe/unsubscribe, or amend an existing subscription, please kindly point your favourite web browser at:http://lists.nongnu.org/mailman/listinfo/swftools-common --- SWFTools-common is a self-managed list. To subscribe/unsubscribe, or amend an existing subscription, please kindly point your favourite web browser at:http://lists.nongnu.org/mailman/listinfo/swftools-common --- SWFTools-common is a self-managed list. To subscribe/unsubscribe, or amend an existing subscription, please kindly point your favourite web browser at:http://lists.nongnu.org/mailman/listinfo/swftools-common
Re: [Swftools-common] Access Violation in swf_GetU8
There are a few other places with *(int*)0=0; (or similar intentional access violation) in the code, did you fix them as well? Please publish an updated release build after the fix, thanks! On Mon, Oct 1, 2012 at 12:57 AM, Matthias Kramm kr...@quiss.org wrote: On Sun, Sep 02, 2012 at 12:47:04PM +0300, Romi Kuntsman rmk...@gmail.com wrote: U8 swf_GetU8(TAG * t) { swf_ResetReadBits(t); #ifdef DEBUG_RFXSWF if ((int)t-pos=(int)t-len) { fprintf(stderr,GetU8() out of bounds: TagID = %i\n,t-id); *(int*)0=0; return 0; } #endif return t-data[t-pos++]; } Fixed, thanks. Matthias --- SWFTools-common is a self-managed list. To subscribe/unsubscribe, or amend an existing subscription, please kindly point your favourite web browser at:http://lists.nongnu.org/mailman/listinfo/swftools-common --- SWFTools-common is a self-managed list. To subscribe/unsubscribe, or amend an existing subscription, please kindly point your favourite web browser at:http://lists.nongnu.org/mailman/listinfo/swftools-common
Re: [Swftools-common] Access Violation in swf_GetU8
It also appears in a few other places in the code: swftools-2012-04-08-0857\lib\as3\expr.c (1 hits) Line 2608: *(int*)0=0; swftools-2012-04-08-0857\lib\mem.c (4 hits) Line 18: //*(int*)0=0; Line 25: //*(int*)0 = 0xdead; Line 42: //*(int*)0 = 0xdead; Line 64: //*(int*)0 = 0xdead; swftools-2012-04-08-0857\lib\rfxswf.c (1 hits) Line 97: *(int*)0=0; On Sun, Sep 2, 2012 at 12:47 PM, Romi Kuntsman rmk...@gmail.com wrote: Hi, This code CRASHES the program: **(int*)0=0;* U8 swf_GetU8(TAG * t) { swf_ResetReadBits(t); #ifdef DEBUG_RFXSWF if ((int)t-pos=(int)t-len) { fprintf(stderr,GetU8() out of bounds: TagID = %i\n,t-id); *(int*)0=0; return 0; } #endif return t-data[t-pos++]; } It does not appear in swf_GetU16 or swf_GetU32. What I want is that swfdump will gracefully fail processing the DoAction tag when it gets such an error, not crash my windows :-) Can you please fix it? Thanks! RK. --- SWFTools-common is a self-managed list. To subscribe/unsubscribe, or amend an existing subscription, please kindly point your favourite web browser at:http://lists.nongnu.org/mailman/listinfo/swftools-common
Re: [Swftools-common] Passing swf in stdin/pipe to swfdump
Isn't it possible to read from stdin into a buffer in memory, then determine it's size, and then go over the data in memory? On Sun, Aug 26, 2012 at 3:01 AM, Matthias Kramm kr...@quiss.org wrote: On Tue, Aug 07, 2012 at 03:30:20PM +0300, Romi Kuntsman rmk...@gmail.com wrote: I'm handling a SWF file in memory in my program, and would like to pass the file to swfdump and read the output. How can this be done without writing it to a temporary file on disk and then passing the filename as parameter, for example using a pipe or similar option? Afraid that a temporary file is the only way to do this- in its current implementation, swfdump reads the file twice- once to determine the file type, and a second time to actually parse, so it can't process a stdin stream for that very reason. Matthias --- SWFTools-common is a self-managed list. To subscribe/unsubscribe, or amend an existing subscription, please kindly point your favourite web browser at:http://lists.nongnu.org/mailman/listinfo/swftools-common --- SWFTools-common is a self-managed list. To subscribe/unsubscribe, or amend an existing subscription, please kindly point your favourite web browser at:http://lists.nongnu.org/mailman/listinfo/swftools-common
[Swftools-common] Passing swf in stdin/pipe to swfdump
Hello, I'm handling a SWF file in memory in my program, and would like to pass the file to swfdump and read the output. How can this be done without writing it to a temporary file on disk and then passing the filename as parameter, for example using a pipe or similar option? If the code (swfdump.c) needs a minor adjustment (in the open(), allow stdin instead), can you please make it? Thank you, RK. --- SWFTools-common is a self-managed list. To subscribe/unsubscribe, or amend an existing subscription, please kindly point your favourite web browser at:http://lists.nongnu.org/mailman/listinfo/swftools-common
Re: [Swftools-common] clickable swf from gif/png/jpg
Also, embedding this swf from gif using swfc *doesn't* make it clickable. 2008/8/13, Romi Kuntsman [EMAIL PROTECTED]: 1. gif2swf doesn't handle the animated frames correctly, erasing the entire image instead of just changed places. see attached example. 2. backgroundcolor doesn't work, you probably meant background RK. 2008/8/13, Matthias Kramm [EMAIL PROTECTED]: On Wed, Aug 13, 2008 at 01:23:57PM +0300, Romi Kuntsman [EMAIL PROTECTED] wrote: 1. How can I do it with GIFs? Convert the gif to a SWF first: gif2swf image.gif -o image.swf and then include the image as swf: .swf image image.swf 2. How do I define a background for the flash, to be behind the image in transparent places? .flash name=clickable.swf version=6 backgroundcolor=#3300ff Greetings Matthias -- [ Romi Kuntsman | High Performance Software Engineer | RockeTier - Startup your engines | http://www.rocketier.com ]
[Swftools-common] clickable swf from gif/png/jpg
Hello, I'm using png2swf, gif2swf, jpg2swf to convert images to SWFs. How can I make them clickable - so a click would lead to the standard clickTAG url, or a predefined URL? Thanks, -- Romi Kuntsman | High Performance Software Engineer RockeTier - Startup your engines | http://www.rocketier.com