Re: Error while merge in delta table
Hi Karthick, Sorry to say it but there's not enough "data" to help you. There should be something more above or below this exception snippet you posted that could pinpoint the root cause. Pozdrawiam, Jacek Laskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Wed, May 10, 2023 at 8:03 PM Karthick Nk wrote: > Hi, > > I am trying to merge daaframe with delta table in databricks, but i am > getting error, i have attached the code nippet and error message for > reference below, > > code: > [image: image.png] > > error: > > [image: image.png] > > Thanks >
Re: How to determine the function of tasks on each stage in an Apache Spark application?
Hi, Start with intercepting stage completions using SparkListenerStageCompleted [1]. That's Spark Core (jobs, stages and tasks). Go up the execution chain to Spark SQL with SparkListenerSQLExecutionStart [2] and SparkListenerSQLExecutionEnd [3], and correlate infos. You may want to look at how web UI works under the covers to collect all the information. Start from SQLTab that should give you what is displayed (that should give you then what's needed and how it's collected). [1] https://github.com/apache/spark/blob/8cceb3946bdfa5ceac0f2b4fe6a7c43eafb76d59/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala#L46 [2] https://github.com/apache/spark/blob/24cdae8f3dcfc825c6c0b8ab8aa8505ae194050b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L44 [3] https://github.com/apache/spark/blob/24cdae8f3dcfc825c6c0b8ab8aa8505ae194050b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L60 [4] https://github.com/apache/spark/blob/c124037b97538b2656d29ce547b2a42209a41703/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala#L24 Pozdrawiam, Jacek Laskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Thu, Apr 13, 2023 at 10:40 AM Trường Trần Phan An wrote: > Hi, > > Can you give me more details or give me a tutorial on "You'd have to > intercept execution events and correlate them. Not an easy task yet doable" > > Thank > > Vào Th 4, 12 thg 4, 2023 vào lúc 21:04 Jacek Laskowski > đã viết: > >> Hi, >> >> tl;dr it's not possible to "reverse-engineer" tasks to functions. >> >> In essence, Spark SQL is an abstraction layer over RDD API that's made up >> of partitions and tasks. Tasks are Scala functions (possibly with some >> Python for PySpark). A simple-looking high-level operator like >> DataFrame.join can end up with multiple RDDs, each with a set of partitions >> (and hence tasks). What the tasks do is an implementation detail that you'd >> have to know about by reading the source code of Spark SQL that produces >> the "bytecode". >> >> Just looking at the DAG or the tasks screenshots won't give you that >> level of detail. You'd have to intercept execution events and correlate >> them. Not an easy task yet doable. HTH. >> >> Pozdrawiam, >> Jacek Laskowski >> >> "The Internals Of" Online Books <https://books.japila.pl/> >> Follow me on https://twitter.com/jaceklaskowski >> >> <https://twitter.com/jaceklaskowski> >> >> >> On Tue, Apr 11, 2023 at 6:53 PM Trường Trần Phan An < >> truong...@vlute.edu.vn> wrote: >> >>> Hi all, >>> >>> I am conducting a study comparing the execution time of Bloom Filter >>> Join operation on two environments: Apache Spark Cluster and Apache Spark. >>> I have compared the overall time of the two environments, but I want to >>> compare specific "tasks on each stage" to see which computation has the >>> most significant difference. >>> >>> I have taken a screenshot of the DAG of Stage 0 and the list of tasks >>> executed in Stage 0. >>> - DAG.png >>> - Task.png >>> >>> *I have questions:* >>> 1. Can we determine which tasks are responsible for executing each step >>> scheduled on the DAG during the processing? >>> 2. Is it possible to know the function of each task (e.g., what is task >>> ID 0 responsible for? What is task ID 1 responsible for? ... )? >>> >>> Best regards, >>> Truong >>> >>> - >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >>
Re: How to create spark udf use functioncatalog?
Hi, I'm not sure I understand the question, but if your question is how to register (plug-in) your own custom FunctionCatalog, it's through spark.sql.catalog configuration property, e.g. spark.sql.catalog.catalog-name=com.example.YourCatalogClass spark.sql.catalog registers a CatalogPlugin that in your case is also supposed to be a FunctionCatalog. When needed, implicit class CatalogHelper.asFunctionCatalog is going to be used to offer your custom CatalogPlugin (e.g., catalog-name above) so functions identified by three-part identifiers (catalog.schema.function) are resolved and used properly using the custom catalog impl. HTH Pozdrawiam, Jacek Laskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Fri, Apr 14, 2023 at 2:10 PM 许新浩 <948718...@qq.com.invalid> wrote: > We are using spark.Today I see the FunctionCatalog , and I have seen the > source of > spark\sql\core\src\test\scala\org\apache\spark\sql\connector\DataSourceV2FunctionSuite.scala > and have implements the ScalarFunction.But i still not konw how > to register it in sql
Re: How to determine the function of tasks on each stage in an Apache Spark application?
Hi, tl;dr it's not possible to "reverse-engineer" tasks to functions. In essence, Spark SQL is an abstraction layer over RDD API that's made up of partitions and tasks. Tasks are Scala functions (possibly with some Python for PySpark). A simple-looking high-level operator like DataFrame.join can end up with multiple RDDs, each with a set of partitions (and hence tasks). What the tasks do is an implementation detail that you'd have to know about by reading the source code of Spark SQL that produces the "bytecode". Just looking at the DAG or the tasks screenshots won't give you that level of detail. You'd have to intercept execution events and correlate them. Not an easy task yet doable. HTH. Pozdrawiam, Jacek Laskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Tue, Apr 11, 2023 at 6:53 PM Trường Trần Phan An wrote: > Hi all, > > I am conducting a study comparing the execution time of Bloom Filter Join > operation on two environments: Apache Spark Cluster and Apache Spark. I > have compared the overall time of the two environments, but I want to > compare specific "tasks on each stage" to see which computation has the > most significant difference. > > I have taken a screenshot of the DAG of Stage 0 and the list of tasks > executed in Stage 0. > - DAG.png > - Task.png > > *I have questions:* > 1. Can we determine which tasks are responsible for executing each step > scheduled on the DAG during the processing? > 2. Is it possible to know the function of each task (e.g., what is task ID > 0 responsible for? What is task ID 1 responsible for? ... )? > > Best regards, > Truong > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: [SparkSQL, SparkUI, RESTAPI] How to extract the WholeStageCodeGen ids from SparkUI
Hi, You could use QueryExecutionListener or Spark listeners to intercept query execution events and extract whatever is required. That's what web UI does (as it's simply a bunch of SparkListeners --> https://youtu.be/mVP9sZ6K__Y ;-)). Pozdrawiam, Jacek Laskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Fri, Apr 7, 2023 at 12:23 PM Chenghao Lyu wrote: > Hi, > > The detailed stage page shows the involved WholeStageCodegen Ids in its > DAG visualization from the Spark UI when running a SparkSQL. (e.g., under > the link > node:18088/history/application_1663600377480_62091/stages/stage/?id=1=0). > > However, I have trouble extracting the WholeStageCodegen ids from the DAG > visualization via the RESTAPIs. Is there any other way to get the > WholeStageCodegen Ids information for each stage automatically? > > Cheers, > Chenghao >
Re: spark.catalog.listFunctions type signatures
Hi, Interesting question indeed! The closest I could get would be to use lookupFunctionBuilder(name: FunctionIdentifier): Option[FunctionBuilder] [1] followed by extracting the dataType from T in `type FunctionBuilder = Seq[Expression] => T` which can be Expression (regular functions) or LogicalPlan (table-valued functions). Expression has got dataType while LogicalPlan has got output (or outputAttributes). HTH Let us know how you're doing. BTW, Can you describe how you "using Apache Calcite to run some SQL transformations on Apache sparks SQL statements"? [1] https://github.com/apache/spark/blob/e60ce3e85081ca8bb247aeceb2681faf6a59a056/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L91 Pozdrawiam, Jacek Laskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Tue, Mar 28, 2023 at 9:01 PM Guillaume Masse < masse.guilla...@narrative.io> wrote: > Hi, > > I'm using Apache Calcite to run some SQL transformations on Apache sparks > SQL statements. I would like to extract the type signature out > of spark.catalog.listFunctions to be able to register them in Calcite with > their proper signature. > > From the API, I can get the fully qualified class name and the name, but > unfortunately, the type signature is not present. Would there be a way to > use reflection to extract? For example: > > > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala#L424 > > Ideally, it would be convenient to get the type signature > from org.apache.spark.sql.catalog.Function itself when available. > > > -- > Guillaume Massé > [Gee-OHM] > (马赛卫) >
Re: [ANNOUNCE] Apache Spark 3.3.1 released
Yoohoo! Thanks Yuming for driving this release. A tiny step for Spark a huge one for my clients (who still are on 3.2.1 or even older :)) Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Wed, Oct 26, 2022 at 8:22 AM Yuming Wang wrote: > We are happy to announce the availability of Apache Spark 3.3.1! > > Spark 3.3.1 is a maintenance release containing stability fixes. This > release is based on the branch-3.3 maintenance branch of Spark. We strongly > recommend all 3.3 users to upgrade to this stable release. > > To download Spark 3.3.1, head over to the download page: > https://spark.apache.org/downloads.html > > To view the release notes: > https://spark.apache.org/releases/spark-release-3-3-1.html > > We would like to acknowledge all community members for contributing to this > release. This release would not have been possible without you. > > >
Re: Prometheus with spark
Hi Raj, Do you want to do the following? spark.read.format("prometheus").load... I haven't heard of such a data source / format before. What would you like it for? Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Fri, Oct 21, 2022 at 6:12 PM Raj ks wrote: > Hi Team, > > > We wanted to query Prometheus data with spark. Any suggestions will > be appreciated > > Searched for documents but did not got any prompt one >
Re: query time comparison to several SQL engines
Hi Wes, Thanks for the report! I like it (mostly because it's short and concise). Thank you. I know nothing about Drill and am curious about the similar execution times and this sentence in the report: "Spark is the second fastest, that should be reasonable, since both Spark and Drill have almost the same implementation architecture.". Is this true that Drill is Spark or vice versa under the hood? If so, how is it possible that Drill is faster? What does Drill do to make the query faster? Could this be that you used a type of query Drill is optimized for? Just guessing and am really curious (not implying that one is better or worse than the other(s)). Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Thu, Apr 7, 2022 at 1:05 PM Wes Peng wrote: > I made a simple test to query time for several SQL engines including > mysql, hive, drill and spark. The report, > > https://cloudcache.net/data/query-time-mysql-hive-drill-spark.pdf > > It maybe have no special meaning, just for fun. :) > > regards. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: Spark Stream on Kubernetes Cannot Set up JavaSparkContext
Hi, No idea still, but noticed "org.apache.spark.streaming.kafka010.KafkaRDDPartition" and "--jars "spark-yarn_2.12-3.1.2.jar,spark-core_2.12-3.1.2.jar,kafka-clients-2.8.0.jar,spark-streaming-kafka-0-10_2.12-3.1.2.jar,spark-token-provider-kafka-0-10_2.12-3.1.2.jar" \" that bothers me quite a lot. First of all, it's a Spark Streaming (not Structured Streaming) app. Correct? Please upgrade at your earliest convenience since it's no longer in active development (if supported at all). Secondly, why are these jars listed explicitly since they're part of Spark? You should not really be doing such risky config changes (unless you've got no other choice and you know what you're doing). Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Tue, Aug 31, 2021 at 1:00 PM Stelios Philippou wrote: > Yes you are right. > I am using Spring Boot for this. > > The same does work for the event that does not involve any kafka events. > But again i am not sending out extra jars there so nothing is replaced and > we are using the default ones. > > If i do not use the userClassPathFirst which will force the service to use > the newer version i will end up with the same problem > > We are using protobuf v3+ and as such we need to push that version since > apache core uses an older version. > > So all we should really need is the following : --jars > "protobuf-java-3.17.3.jar" \ > and here we need the userClassPathFirst=true in order to use the latest > version. > > > Using only this jar as it works on local or no jars defined we ended up > with the following error. > > 21/08/31 10:53:40 WARN org.apache.spark.scheduler.TaskSetManager: Lost > task 0.0 in stage 18.0 (TID 139) (10.60.63.56 executor 1): > java.lang.ClassNotFoundException: > org.apache.spark.streaming.kafka010.KafkaRDDPartition > > at java.base/java.net.URLClassLoader.findClass(Unknown Source) > > at java.base/java.lang.ClassLoader.loadClass(Unknown Source) > > at java.base/java.lang.ClassLoader.loadClass(Unknown Source) > > at java.base/java.lang.Class.forName0(Native Method) > > at java.base/java.lang.Class.forName(Unknown Source) > > at > org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68) > > > > > Which can be resolved with passing more jars. > > > Any idea about this error ? > > K8 does not seem to like this, but Java Spring should be the one that is > responsible for the version but it seems K8 does not like this versions. > > Perhaps miss configuration on K8 ? > > I haven't set that up so i am not aware of what was done there. > > > > For downgrading to java 8 on my K8 might not be so easy. I want to explore > if there is something else before doing that as we will need to spin off > new instances of K8 to check that. > > > > Thank you for the time taken > > > > > On Tue, 31 Aug 2021 at 12:26, Jacek Laskowski wrote: > >> Hi Stelios, >> >> I've never seen this error before, but a couple of things caught >> my attention that I would look at closer to chase the root cause of the >> issue. >> >> "org.springframework.context.annotation.AnnotationConfigApplicationContext:" >> and "21/08/31 07:28:42 ERROR org.springframework.boot.SpringApplication: >> Application run failed" seem to indicate that you're using Spring Boot >> (that I know almost nothing about so take the following with a pinch of >> salt :)) >> >> Spring Boot manages the classpath by itself and together with another >> interesting option in your >> spark-submit, spark.driver.userClassPathFirst=true, makes me wonder how >> much this exception: >> >> > org.apache.spark.scheduler.ExternalClusterManager: >> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a >> subtype >> >> could be due to casting compatible types from two different classloaders? >> >> Just a thought but wanted to share as I think it's worth investigating. >> >> Pozdrawiam, >> Jacek Laskowski >> >> https://about.me/JacekLaskowski >> "The Internals Of" Online Books <https://books.japila.pl/> >> Follow me on https://twitter.com/jaceklaskowski >> >> <https://twitter.com/jaceklaskowski> >> >> >> On Tue, Aug 31, 2021 at 9:44 AM Stelios Philippou >> wrote: >> >>> Hello, >>> >>> I have been facing the current issue for some time no
Re: Spark Stream on Kubernetes Cannot Set up JavaSparkContext
Hi Stelios, I've never seen this error before, but a couple of things caught my attention that I would look at closer to chase the root cause of the issue. "org.springframework.context.annotation.AnnotationConfigApplicationContext:" and "21/08/31 07:28:42 ERROR org.springframework.boot.SpringApplication: Application run failed" seem to indicate that you're using Spring Boot (that I know almost nothing about so take the following with a pinch of salt :)) Spring Boot manages the classpath by itself and together with another interesting option in your spark-submit, spark.driver.userClassPathFirst=true, makes me wonder how much this exception: > org.apache.spark.scheduler.ExternalClusterManager: org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a subtype could be due to casting compatible types from two different classloaders? Just a thought but wanted to share as I think it's worth investigating. Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Tue, Aug 31, 2021 at 9:44 AM Stelios Philippou wrote: > Hello, > > I have been facing the current issue for some time now and I was wondering > if someone might have some inside on how I can resolve the following. > > The code (java 11) is working correctly on my local machine but whenever I > try to launch the following on K8 I am getting the following error. > > 21/08/31 07:28:42 ERROR org.apache.spark.SparkContext: Error > initializing SparkContext. > > java.util.ServiceConfigurationError: > org.apache.spark.scheduler.ExternalClusterManager: > org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a > subtype > > > > I have a spark that will monitor some directories and handle the data > accordingly. > > That part is working correctly on K8 and the SparkContext has no issue > being initialized there. > > > This is the spark-submit for that > > > spark-submit \ > --master=k8s://https://url:port \ > --deploy-mode cluster \ > --name a-name\ > --conf spark.driver.userClassPathFirst=true \ > --conf spark.kubernetes.file.upload.path=hdfs://upload-path \ > --files "application-dev.properties,keystore.jks,truststore.jks" \ > --conf spark.kubernetes.container.image=url/spark:spark-submit \ > --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \ > --conf spark.kubernetes.namespace=spark \ > --conf spark.kubernetes.container.image.pullPolicy=Always \ > --conf spark.dynamicAllocation.enabled=false \ > --driver-memory 525m --executor-memory 525m \ > --num-executors 1 --executor-cores 1 \ > target/SparkStream.jar continuous-merge > > > My issue comes when I try to launch the service in order to listen to > kafka events and store them in HDFS. > > > spark-submit \ > --master=k8s://https://url:port \ > --deploy-mode cluster \ > --name consume-data \ > --conf spark.driver.userClassPathFirst=true \ > --conf spark.kubernetes.file.upload.path=hdfs://upload-path\ > --files "application-dev.properties,keystore.jks,truststore.jks" \ > --jars > "spark-yarn_2.12-3.1.2.jar,spark-core_2.12-3.1.2.jar,kafka-clients-2.8.0.jar,spark-streaming-kafka-0-10_2.12-3.1.2.jar,spark-token-provider-kafka-0-10_2.12-3.1.2.jar" > \ > --conf spark.kubernetes.container.image=url/spark:spark-submit \ > --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \ > --conf spark.kubernetes.authenticate.executor.serviceAccountName=spark \ > --conf spark.kubernetes.namespace=spark \ > --conf spark.kubernetes.container.image.pullPolicy=Always \ > --conf spark.dynamicAllocation.enabled=false \ > --driver-memory 1g --executor-memory 1g \ > --num-executors 1 --executor-cores 1 \ > target/SparkStream.jar consume > > > It could be that I am launching the application wrongly or perhaps that my > K8 is not configured correctly ? > > > > I have stripped down my code and left it barebone and will end up with the > following issue : > > > 21/08/31 07:28:42 ERROR org.apache.spark.SparkContext: Error > initializing SparkContext. > > java.util.ServiceConfigurationError: > org.apache.spark.scheduler.ExternalClusterManager: > org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a > subtype > > at java.base/java.util.ServiceLoader.fail(Unknown Source) > > at > java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(Unknown > Source) > > at > java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(Unknown > Source) > > at java.base/java.util.ServiceLoader$
Re: Connection reset by peer : failed to remove cache rdd
Hi, No idea what might be going on here, but I'd not worry much about it and simply monitor disk usage as some broadcast blocks might have left over. Do you know when in your application lifecycle it happens? Spark SQL or Structured Streaming? Do you use broadcast variables or are the errors coming from broadcast joins perhaps? Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Mon, Aug 30, 2021 at 3:26 PM Harsh Sharma wrote: > We are facing issue in production where we are getting frequent > > Still have 1 request outstanding when connection with the hostname was > closed > > connection reset by peer : errors as well as warnings : failed to remove > cache rdd or failed to remove broadcast variable. > > Please help us how to mitigate this : > > Executor memory : 12g > > Network timeout : 60 > > Heartbeat interval : 25 > > > > [Stage 284:>(199 + 1) / 200][Stage 292:> (1 + 3) > / 200] > [Stage 284:>(199 + 1) / 200][Stage 292:> (2 + 3) > / 200] > [Stage 292:> (2 + 4) > / 200][14/06/21 10:46:17,006 WARN > shuffle-server-4](TransportChannelHandler) Exception in connection from > > java.io.IOException: Connection reset by peer > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) > at sun.nio.ch.IOUtil.read(IOUtil.java:192) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:378) > at > io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313) > at > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) > at > io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > at java.lang.Thread.run(Thread.java:748) > [14/06/21 10:46:17,010 ERROR shuffle-server-4](TransportResponseHandler) > Still have 1 requests outstanding when connection from is closed > [14/06/21 10:46:17,012 ERROR Spark Context Cleaner](ContextCleaner) Error > cleaning broadcast 159 > java.io.IOException: Connection reset by peer > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) > at sun.nio.ch.IOUtil.read(IOUtil.java:192) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:378) > at > io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313) > at > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) > at > io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > at java.lang.Thread.run(Thread.java:748) > [14/06/21 10:46:17,012 WARN > block-manager-ask-thread-pool-69](BlockManagerMaster) Failed to remove > broadcast 159 with removeFromMaster = true - Connection reset by peer > java.io.IOException: Connection reset by peer > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) > at sun.
Re: Java : Testing RDD aggregateByKey
Hi Pedro, > Anyway, maybe the behavior is weird, I could expect that repartition to zero was not allowed or at least warned instead of just discarting all the data . Interesting... scala> spark.version res3: String = 3.1.2 scala> spark.range(5).repartition(0) java.lang.IllegalArgumentException: requirement failed: Number of partitions (0) must be positive. at scala.Predef$.require(Predef.scala:281) at org.apache.spark.sql.catalyst.plans.logical.Repartition.(basicLogicalOperators.scala:1032) at org.apache.spark.sql.Dataset.repartition(Dataset.scala:3016) ... 47 elided How are the above different from yours? Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Thu, Aug 19, 2021 at 5:43 PM Pedro Tuero wrote: > Hi, I'm sorry , the problem was really silly: In the test the number of > partitions were zero (it was a division of the original number of > partitions of the RDD source and in the test that number was just one) and > that's why the test was failing. > Anyway, maybe the behavior is weird, I could expect that repartition to > zero was not allowed or at least warned instead of just discarting all the > data . > > Thanks for your time! > Regards, > Pedro > > El jue, 19 de ago. de 2021 a la(s) 07:42, Jacek Laskowski (ja...@japila.pl) > escribió: > >> Hi Pedro, >> >> No idea what might be causing it. Do you perhaps have some code to >> reproduce it locally? >> >> Pozdrawiam, >> Jacek Laskowski >> >> https://about.me/JacekLaskowski >> "The Internals Of" Online Books <https://books.japila.pl/> >> Follow me on https://twitter.com/jaceklaskowski >> >> <https://twitter.com/jaceklaskowski> >> >> >> On Tue, Aug 17, 2021 at 4:14 PM Pedro Tuero wrote: >> >>> >>> Context: spark-core_2.12-3.1.1 >>> Testing with maven and eclipse. >>> >>> I'm modifying a project and a test stops working as expected. >>> The difference is in the parameters passed to the function >>> aggregateByKey of JavaPairRDD. >>> >>> JavaSparkContext is created this way: >>> new JavaSparkContext(new SparkConf() >>> .setMaster("local[1]") >>> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")); >>> Then I construct a JavaPairRdd using sparkContext.paralellizePairs and >>> call a method which makes an aggregateByKey over the input JavaPairRDD and >>> test that the result is the expected. >>> >>> When I use JavaPairRDD line 369 (doing .aggregateByKey(zeroValue, >>> combiner, merger); >>> def aggregateByKey[U](zeroValue: U, seqFunc: JFunction2[U, V, U], >>> combFunc: JFunction2[U, U, U]): >>> JavaPairRDD[K, U] = { >>> implicit val ctag: ClassTag[U] = fakeClassTag >>> fromRDD(rdd.aggregateByKey(zeroValue)(seqFunc, combFunc)) >>> } >>> The test works as expected. >>> But when I use: JavaPairRDD line 355 (doing .aggregateByKey(zeroValue, >>> *partitions*,combiner, merger);) >>> def aggregateByKey[U](zeroValue: U, *numPartitions: Int,* seqFunc: >>> JFunction2[U, V, U], >>> combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] = { >>> implicit val ctag: ClassTag[U] = fakeClassTag >>> fromRDD(rdd.aggregateByKey(zeroValue, *numPartitions)*(seqFunc, >>> combFunc)) >>> } >>> The result is always empty. It looks like there is a problem with the >>> hashPartitioner created at PairRddFunctions : >>> def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: >>> Int)(seqOp: (U, V) => U, >>> combOp: (U, U) => U): RDD[(K, U)] = self.withScope { >>> aggregateByKey(zeroValue, *new HashPartitioner(numPartitions)*)(seqOp, >>> combOp) >>> } >>> vs: >>> def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, >>> combOp: (U, U) => U): RDD[(K, U)] = self.withScope { >>> aggregateByKey(zeroValue, *defaultPartitioner*(self))(seqOp, combOp) >>> } >>> I can't debug it properly with eclipse, and error occurs when threads >>> are in spark code (system editor can only open file base resources). >>> >>> Does anyone know how to resolve this issue? >>> >>> Thanks in advance, >>> Pedro. >>> >>> >>> >>>
Re: Is memory-only no-disk Spark possible?
Hi Bobby, What a great summary of what happens behind the scenes! Enjoyed every sentence! "The default shuffle implementation will always write out to disk." <-- that's what I wasn't sure about the most. Thanks again! /me On digging deeper... Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Fri, Aug 20, 2021 at 4:27 PM Bobby Evans wrote: > On the data path, Spark will write to a local disk when it runs out of > memory and needs to spill or when doing a shuffle with the default shuffle > implementation. The spilling is a good thing because it lets you process > data that is too large to fit in memory. It is not great because the > processing slows down a lot when that happens, but slow is better than > crashing in many cases. The default shuffle implementation will > always write out to disk. This again is good in that it allows you to > process more data on a single box than can fit in memory. It is bad when > the shuffle data could fit in memory, but ends up being written to disk > anyways. On Linux the data is being written into the page cache and will > be flushed to disk in the background when memory is needed or after a set > amount of time. If your query is fast and is shuffling little data, then it > is likely that your query is running all in memory. All of the shuffle > reads and writes are probably going directly to the page cache and the disk > is not involved at all. If you really want to you can configure the > pagecache to not spill to disk until absolutely necessary. That should get > you really close to pure in-memory processing, so long as you have enough > free memory on the host to support it. > > Bobby > > > > On Fri, Aug 20, 2021 at 7:57 AM Mich Talebzadeh > wrote: > >> Well I don't know what having an "in-memory Spark only" is going to >> achieve. Spark GUI shows the amount of disk usage pretty well. The memory >> is used exclusively by default first. >> >> Spark is no different from a predominantly in-memory application. >> Effectively it is doing the classical disk based hadoop map-reduce >> operation "in memory" to speed up the processing but it is still an >> application on top of the OS. So like mose applications, there is a state >> of Spark, the code running and the OS(s), where disk usage will be needed. >> >> This is akin to swap space on OS itself and I quote "Swap space is used when >> your operating system decides that it needs physical memory for active >> processes and the amount of available (unused) physical memory is >> insufficient. When this happens, inactive pages from the physical memory >> are then moved into the swap space, freeing up that physical memory for >> other uses" >> >> free >> totalusedfree shared buff/cache >> available >> Mem: 6565973230116700 1429436 234177234113596 >> 32665372 >> Swap: 104857596 550912 104306684 >> >> HTH >> >> >>view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Fri, 20 Aug 2021 at 12:50, Jacek Laskowski wrote: >> >>> Hi, >>> >>> I've been exploring BlockManager and the stores for a while now and am >>> tempted to say that a memory-only Spark setup would be possible (except >>> shuffle blocks). Is this correct? >>> >>> What about shuffle blocks? Do they have to be stored on disk (in >>> DiskStore)? >>> >>> I think broadcast variables are in-memory first so except on-disk >>> storage level explicitly used (by Spark devs), there's no reason not to >>> have Spark in-memory only. >>> >>> (I was told that one of the differences between Trino/Presto vs Spark >>> SQL is that Trino keeps all processing in-memory only and will blow up >>> while Spark uses disk to avoid OOMEs). >>> >>> Pozdrawiam, >>> Jacek Laskowski >>> >>> https://about.me/JacekLaskowski >>> "The Internals Of" Online Books <https://books.japila.pl/> >>> Follow me on https://twitter.com/jaceklaskowski >>> >>> <https://twitter.com/jaceklaskowski> >>> >>
Is memory-only no-disk Spark possible?
Hi, I've been exploring BlockManager and the stores for a while now and am tempted to say that a memory-only Spark setup would be possible (except shuffle blocks). Is this correct? What about shuffle blocks? Do they have to be stored on disk (in DiskStore)? I think broadcast variables are in-memory first so except on-disk storage level explicitly used (by Spark devs), there's no reason not to have Spark in-memory only. (I was told that one of the differences between Trino/Presto vs Spark SQL is that Trino keeps all processing in-memory only and will blow up while Spark uses disk to avoid OOMEs). Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski>
Re: Java : Testing RDD aggregateByKey
Hi Pedro, No idea what might be causing it. Do you perhaps have some code to reproduce it locally? Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Tue, Aug 17, 2021 at 4:14 PM Pedro Tuero wrote: > > Context: spark-core_2.12-3.1.1 > Testing with maven and eclipse. > > I'm modifying a project and a test stops working as expected. > The difference is in the parameters passed to the function aggregateByKey > of JavaPairRDD. > > JavaSparkContext is created this way: > new JavaSparkContext(new SparkConf() > .setMaster("local[1]") > .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")); > Then I construct a JavaPairRdd using sparkContext.paralellizePairs and > call a method which makes an aggregateByKey over the input JavaPairRDD and > test that the result is the expected. > > When I use JavaPairRDD line 369 (doing .aggregateByKey(zeroValue, > combiner, merger); > def aggregateByKey[U](zeroValue: U, seqFunc: JFunction2[U, V, U], > combFunc: JFunction2[U, U, U]): > JavaPairRDD[K, U] = { > implicit val ctag: ClassTag[U] = fakeClassTag > fromRDD(rdd.aggregateByKey(zeroValue)(seqFunc, combFunc)) > } > The test works as expected. > But when I use: JavaPairRDD line 355 (doing .aggregateByKey(zeroValue, > *partitions*,combiner, merger);) > def aggregateByKey[U](zeroValue: U, *numPartitions: Int,* seqFunc: > JFunction2[U, V, U], > combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] = { > implicit val ctag: ClassTag[U] = fakeClassTag > fromRDD(rdd.aggregateByKey(zeroValue, *numPartitions)*(seqFunc, > combFunc)) > } > The result is always empty. It looks like there is a problem with the > hashPartitioner created at PairRddFunctions : > def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: > (U, V) => U, > combOp: (U, U) => U): RDD[(K, U)] = self.withScope { > aggregateByKey(zeroValue, *new HashPartitioner(numPartitions)*)(seqOp, > combOp) > } > vs: > def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, > combOp: (U, U) => U): RDD[(K, U)] = self.withScope { > aggregateByKey(zeroValue, *defaultPartitioner*(self))(seqOp, combOp) > } > I can't debug it properly with eclipse, and error occurs when threads are > in spark code (system editor can only open file base resources). > > Does anyone know how to resolve this issue? > > Thanks in advance, > Pedro. > > > >
Re: [ANNOUNCE] Apache Spark 3.1.2 released
Big shout-out to you, Dongjoon! Thank you. Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Wed, Jun 2, 2021 at 2:59 AM Dongjoon Hyun wrote: > We are happy to announce the availability of Spark 3.1.2! > > Spark 3.1.2 is a maintenance release containing stability fixes. This > release is based on the branch-3.1 maintenance branch of Spark. We strongly > recommend all 3.1 users to upgrade to this stable release. > > To download Spark 3.1.2, head over to the download page: > https://spark.apache.org/downloads.html > > To view the release notes: > https://spark.apache.org/releases/spark-release-3-1-2.html > > We would like to acknowledge all community members for contributing to this > release. This release would not have been possible without you. > > Dongjoon Hyun >
Re: Updating spark-env.sh per application
Hi, The easiest (but perhaps not necessarily the most flexible) is simply to use two different versions of spark-submit script with the env var set to two different values. Have you tried it yet? Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Sun, May 9, 2021 at 5:32 AM Renu Yadav wrote: > Hi Mich, > > In spark-env.sh , SPARK_DIST_CLASSPATH is set . I want to override this > variable during runtime as wanted to exclude one lib class from it. > > > > On Fri, 7 May, 2021, 6:51 pm Mich Talebzadeh, > wrote: > >> Hi, >> >> Environment variables Re read in when spark-submit kicks off. What >> exactly you need to refresh at the application level? >> >> HTH >> >> On Fri, 7 May 2021 at 11:34, Renu Yadav wrote: >> >>> Hi Team, >>> >>> Is it possible to override the variable of spark-env.sh on application >>> level ? >>> >>> Thanks & Regards, >>> Renu Yadav >>> >>> >>> On Fri, May 7, 2021 at 12:16 PM Renu Yadav wrote: >>> >>>> Hi Team, >>>> >>>> Is it possible to override the variable of spark-env.sh on application >>>> level ? >>>> >>>> Thanks & Regards, >>>> Renu Yadav >>>> >>>> -- >> >> >> >>view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >
Re: Spark structured streaming + offset management in kafka + kafka headers
Hi, Just to add it to Gabor's excellent answer that checkpointing and offsets are infrastructure-related and should not really be in the hands of Spark devs who should instead focus on the business purpose of the code (not offsets that are very low-level and not really important). BTW That's what happens in Kafka Streams too Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Sun, Apr 4, 2021 at 12:28 PM Gabor Somogyi wrote: > There is no way to store offsets in Kafka and restart from the stored > offset. Structured Streaming stores offset in checkpoint and it restart > from there without any user code. > > Offsets can be stored with a listener but it can be only used for lag > calculation. > > BR, > G > > > On Sat, 3 Apr 2021, 21:09 Ali Gouta, wrote: > >> Hello, >> >> I was reading the spark docs about spark structured streaming, since we >> are thinking about updating our code base that today uses Dstreams, hence >> spark streaming. Also, one main reason for this change that we want to >> realize is that reading headers in kafka messages is only supported in >> spark structured streaming and not in Dstreams. >> >> I was surprised to not see an obvious way to handle manually the offsets >> by committing the offsets to kafka. In spark streaming we used to do it >> with something similar to these lines of code: >> >> stream.foreachRDD { rdd => >> val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges >> >> // some time later, after outputs have completed >> stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)} >> >> >> And this works perfectly ! Especially, this works very nice in case of >> job failure/restart... I am wondering how this can be achieved in spark >> structured streaming ? >> >> I read about checkpoints, and this reminds me the old way of doing things >> in spark 1.5/kafka0.8 and is not perfect since we are not deciding when to >> commit offsets by ourselves. >> >> Did I miss anything ? What would be the best way of committing offsets to >> kafka with spark structured streaming to the concerned consumer group ? >> >> Best regards, >> Ali Gouta. >> >
Re: Writing to Google Cloud Storage with v2 algorithm safe?
Hi Vaquar, Thanks a lot! Accepted as the answer (yet there was the other answer that was very helpful too). Tons of reading ahead to understand it more. That once again makes me feel that Hadoop MapReduce experience would help a great deal (and I've got none). Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Sun, Apr 4, 2021 at 7:28 AM vaquar khan wrote: > Hi Jecek , > > I have answered , hope you find it useful. > > Regards, > Viquar khan > > On Sat, Apr 3, 2021 at 11:19 AM Jacek Laskowski wrote: > >> Hi, >> >> I've just posted a question on StackOverflow [1] about the safety of the >> v2 algorithm while writing out to Google Cloud Storage. I think I'm missing >> some fundamentals on how cloud object stores work (GCS in particular) and >> hence the question. >> >> Is this all about File.rename and how many HTTP calls are there under the >> covers? How to know it for GCS? >> >> Thank you for any help you can provide. Merci beaucoup mes amis :) >> >> [1] https://stackoverflow.com/q/66933229/1305344 >> >> Pozdrawiam, >> Jacek Laskowski >> >> https://about.me/JacekLaskowski >> "The Internals Of" Online Books <https://books.japila.pl/> >> Follow me on https://twitter.com/jaceklaskowski >> >> <https://twitter.com/jaceklaskowski> >> > > > -- > Regards, > Vaquar Khan > +1 -224-436-0783 > Greater Chicago >
Re: Source.getBatch and schema vs qe.analyzed.schema?
Hi Bartosz, This is not a question about whether the data source supports fixed or user-defined schema but what schema to use when requested for a streaming batch in Source.getBatch. Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Wed, Mar 31, 2021 at 7:44 PM Bartosz Konieczny wrote: > Hi Jacek, > > An interesting question! I don't know the exact answer and will be happy > to learn by the way :) Below you can find my understanding for these 2 > things, hoping it helps a little. > > For me, we can distinguish 2 different source categories. The first of > them is a source with some fixed schema. A good example is Apache Kafka > which exposes the topic name, key, value and you can't change that; it's > always the same, whenever you run the reader in Company A or in Company B. > What changes is the data extraction logic from the key, value or headers. > But it's business-specific, not data store-specific. You can find the > schema implementation here: Kafka > <https://github.com/apache/spark/blob/v3.1.1/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala#L122> > > The second type is a source with user-defined schema, like a RDBMS table > or a NoSQL schemaless store. Here, predicting the schema will not only be > business-specific, but also data store-specific; you can set any name for a > Primary Key column, there is no such rule like "key" or "value" in Kafka. > To avoid runtime errors (= favor fail-fast approach before the data is > read), Spark can use the metadata to assert (analyze) the schema specified > by the user to confirm it or fail fast before reading the data. > > Best, > Bartosz. > > > > On Mon, Mar 29, 2021 at 1:07 PM Jacek Laskowski wrote: > >> Hi, >> >> I've been developing a data source with a source and sink for Spark >> Structured Streaming. >> >> I've got a question about Source.getBatch [1]: >> >> def getBatch(start: Option[Offset], end: Offset): DataFrame >> >> getBatch returns a streaming DataFrame between the offsets so the idiom >> (?) is to have a code as follows: >> >> val relation = new MyRelation(...)(sparkSession) >> val plan = LogicalRelation(relation, isStreaming = true) >> new Dataset[Row](sparkSession, plan, RowEncoder(schema)) >> >> Note the use of schema [2] that is another part of the Source abstraction: >> >> def schema: StructType >> >> This is the "source" of my question. Is the above OK in a streaming sink >> / Source.getBatch? >> >> Since there are no interim operators that could change attributes >> (schema) I think it's OK. >> >> I've seen the following code and that made me wonder whether it's better >> or not compared to the solution above: >> >> val relation = new MyRelation(...)(sparkSession) >> val plan = LogicalRelation(relation, isStreaming = true) >> >> // When would we have to execute plan? >> val qe = sparkSession.sessionState.executePlan(plan) >> new Dataset[Row](sparkSession, plan, RowEncoder(qe.analyzed.schema)) >> >> When would or do we have to use qe.analyzed.schema vs simply schema? >> Could this qe.analyzed.schema help avoid some edge cases and is a preferred >> approach? >> >> Thank you for any help you can offer. Much appreciated. >> >> [1] >> https://github.com/apache/spark/blob/053dd858d38e6107bc71e0aa3a4954291b74f8c8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala#L61 >> [2] >> https://github.com/apache/spark/blob/053dd858d38e6107bc71e0aa3a4954291b74f8c8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala#L35 >> >> Pozdrawiam, >> Jacek Laskowski >> >> https://about.me/JacekLaskowski >> "The Internals Of" Online Books <https://books.japila.pl/> >> Follow me on https://twitter.com/jaceklaskowski >> >> <https://twitter.com/jaceklaskowski> >> > > > -- > Bartosz Konieczny > data engineer > https://www.waitingforcode.com > https://github.com/bartosz25/ > https://twitter.com/waitingforcode > >
Re: Writing to Google Cloud Storage with v2 algorithm safe?
Hi, From https://spark.apache.org/docs/3.1.1/cloud-integration.html#recommended-settings-for-writing-to-object-stores : > For object stores whose consistency model means that rename-based commits are safe use the FileOutputCommitter v2 algorithm for performance; v1 for safety. These are "safe" and "safety" meanings. Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Sat, Apr 3, 2021 at 7:49 PM Mich Talebzadeh wrote: > Hi Jacek, > > Can you please clarify your question? > > with regard to your point: > > "... I think I'm missing some fundamentals on how cloud object stores work > (GCS in particular) and hence the question." > > The end result is the safe storage of data in object storage in GCP right? > > HTH > > > > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Sat, 3 Apr 2021 at 17:13, Jacek Laskowski wrote: > >> Hi, >> >> I've just posted a question on StackOverflow [1] about the safety of the >> v2 algorithm while writing out to Google Cloud Storage. I think I'm missing >> some fundamentals on how cloud object stores work (GCS in particular) and >> hence the question. >> >> Is this all about File.rename and how many HTTP calls are there under the >> covers? How to know it for GCS? >> >> Thank you for any help you can provide. Merci beaucoup mes amis :) >> >> [1] https://stackoverflow.com/q/66933229/1305344 >> >> Pozdrawiam, >> Jacek Laskowski >> >> https://about.me/JacekLaskowski >> "The Internals Of" Online Books <https://books.japila.pl/> >> Follow me on https://twitter.com/jaceklaskowski >> >> <https://twitter.com/jaceklaskowski> >> >
Writing to Google Cloud Storage with v2 algorithm safe?
Hi, I've just posted a question on StackOverflow [1] about the safety of the v2 algorithm while writing out to Google Cloud Storage. I think I'm missing some fundamentals on how cloud object stores work (GCS in particular) and hence the question. Is this all about File.rename and how many HTTP calls are there under the covers? How to know it for GCS? Thank you for any help you can provide. Merci beaucoup mes amis :) [1] https://stackoverflow.com/q/66933229/1305344 Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski>
Source.getBatch and schema vs qe.analyzed.schema?
Hi, I've been developing a data source with a source and sink for Spark Structured Streaming. I've got a question about Source.getBatch [1]: def getBatch(start: Option[Offset], end: Offset): DataFrame getBatch returns a streaming DataFrame between the offsets so the idiom (?) is to have a code as follows: val relation = new MyRelation(...)(sparkSession) val plan = LogicalRelation(relation, isStreaming = true) new Dataset[Row](sparkSession, plan, RowEncoder(schema)) Note the use of schema [2] that is another part of the Source abstraction: def schema: StructType This is the "source" of my question. Is the above OK in a streaming sink / Source.getBatch? Since there are no interim operators that could change attributes (schema) I think it's OK. I've seen the following code and that made me wonder whether it's better or not compared to the solution above: val relation = new MyRelation(...)(sparkSession) val plan = LogicalRelation(relation, isStreaming = true) // When would we have to execute plan? val qe = sparkSession.sessionState.executePlan(plan) new Dataset[Row](sparkSession, plan, RowEncoder(qe.analyzed.schema)) When would or do we have to use qe.analyzed.schema vs simply schema? Could this qe.analyzed.schema help avoid some edge cases and is a preferred approach? Thank you for any help you can offer. Much appreciated. [1] https://github.com/apache/spark/blob/053dd858d38e6107bc71e0aa3a4954291b74f8c8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala#L61 [2] https://github.com/apache/spark/blob/053dd858d38e6107bc71e0aa3a4954291b74f8c8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala#L35 Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski>
Re: [k8s] PersistentVolumeClaim support in 3.1.1 on minikube
Hi, I think I found it. I should be using OnDemand claim name so it gets replaced to be unique per executor (?) Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Mon, Mar 15, 2021 at 8:36 PM Jacek Laskowski wrote: > Hi, > > I've been toying with persistent volumes using Spark 3.1.1 on minikube and > am wondering whether it's a supported platform. I'd not be surprised if not > given all the surprises I've been experiencing lately. > > Can I use spark-shell or any Spark app in client mode with PVCs with the > default 2 executors? Should the following work if I removed --num-executors > 1? > > ./bin/spark-shell \ > --master k8s://$K8S_SERVER \ > --num-executors 1 \ > --conf > spark.kubernetes.executor.volumes.persistentVolumeClaim.$VOLUME_NAME.mount.path=$MOUNT_PATH > \ > --conf > spark.kubernetes.executor.volumes.persistentVolumeClaim.$VOLUME_NAME.options.claimName=$PVC_CLAIM_NAME > \ > --conf > spark.kubernetes.executor.volumes.persistentVolumeClaim.$VOLUME_NAME.options.storageClass=$PVC_STORAGE_CLASS > \ > --conf > spark.kubernetes.executor.volumes.persistentVolumeClaim.$VOLUME_NAME.options.sizeLimit=$PVC_SIZE_LIMIT > \ > --conf spark.kubernetes.container.image=$IMAGE_NAME \ > --conf spark.kubernetes.context=minikube \ > --conf spark.kubernetes.namespace=spark-demo \ > --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \ > --verbose > > The whole demo is available at > https://jaceklaskowski.github.io/spark-kubernetes-book/demo/persistentvolumeclaims/ > > Please help. Thank you! > > Pozdrawiam, > Jacek Laskowski > > https://about.me/JacekLaskowski > "The Internals Of" Online Books <https://books.japila.pl/> > Follow me on https://twitter.com/jaceklaskowski > > <https://twitter.com/jaceklaskowski> >
[k8s] PersistentVolumeClaim support in 3.1.1 on minikube
Hi, I've been toying with persistent volumes using Spark 3.1.1 on minikube and am wondering whether it's a supported platform. I'd not be surprised if not given all the surprises I've been experiencing lately. Can I use spark-shell or any Spark app in client mode with PVCs with the default 2 executors? Should the following work if I removed --num-executors 1? ./bin/spark-shell \ --master k8s://$K8S_SERVER \ --num-executors 1 \ --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.$VOLUME_NAME.mount.path=$MOUNT_PATH \ --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.$VOLUME_NAME.options.claimName=$PVC_CLAIM_NAME \ --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.$VOLUME_NAME.options.storageClass=$PVC_STORAGE_CLASS \ --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.$VOLUME_NAME.options.sizeLimit=$PVC_SIZE_LIMIT \ --conf spark.kubernetes.container.image=$IMAGE_NAME \ --conf spark.kubernetes.context=minikube \ --conf spark.kubernetes.namespace=spark-demo \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \ --verbose The whole demo is available at https://jaceklaskowski.github.io/spark-kubernetes-book/demo/persistentvolumeclaims/ Please help. Thank you! Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski>
Re: Spark 3.0.1 | Volume to use For Spark Kubernetes Executor Part Files Storage
Hi, On GCP I'd go for buckets in Google Storage. Not sure how reliable it is in production deployments though. Only demo experience here. Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Mon, Mar 8, 2021 at 12:33 PM Ranju Jain wrote: > Hi Jacek, > > > > I am using this property > spark.kubernetes.executor.deleteOnTermination=true only to troubleshoot > else I am freeing up resources after executors complete their job. > > Now I want to use some Shared storage which can be shared by all executors > to write the part files. > > Which Kubernetes Storage I should go for? > > > > Regards > > Ranju > > *From:* Jacek Laskowski > *Sent:* Monday, March 8, 2021 4:14 PM > *To:* Ranju Jain > *Cc:* Attila Zsolt Piros ; > user@spark.apache.org > *Subject:* Re: Spark 3.0.1 | Volume to use For Spark Kubernetes Executor > Part Files Storage > > > > Hi, > > > > > as Executors terminates after their work completes. > > > > --conf spark.kubernetes.executor.deleteOnTermination=false ? > > > Pozdrawiam, > > Jacek Laskowski > > > > https://about.me/JacekLaskowski > > "The Internals Of" Online Books > <https://protect2.fireeye.com/v1/url?k=901b36bb-cf800fbe-901b7620-86d2114eab2f-836d4bb779fe8f92=1=d3b471ef-b3ce-4cfd-9bcd-f42590d0f10b=https%3A%2F%2Fbooks.japila.pl%2F> > > Follow me on https://twitter.com/jaceklaskowski > > > <https://twitter.com/jaceklaskowski> > > > > > > On Sun, Mar 7, 2021 at 5:23 PM Ranju Jain > wrote: > > Hi, > > > > I need to save the Executors processed data in the form of part files , > but I think persistent Volume is not an option for this as Executors > terminates after their work completes. > > So I am thinking to use shared volume across executor pods. > > > > Should I go with NFS or is there any other Volume option as well to > explore? > > > > Regards > > Ranju > >
Re: Spark 3.0.1 | Volume to use For Spark Kubernetes Executor Part Files Storage
Hi, > as Executors terminates after their work completes. --conf spark.kubernetes.executor.deleteOnTermination=false ? Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Sun, Mar 7, 2021 at 5:23 PM Ranju Jain wrote: > Hi, > > > > I need to save the Executors processed data in the form of part files , > but I think persistent Volume is not an option for this as Executors > terminates after their work completes. > > So I am thinking to use shared volume across executor pods. > > > > Should I go with NFS or is there any other Volume option as well to > explore? > > > > Regards > > Ranju >
Re: Spark Kubernetes 3.0.1 | podcreationTimeout not working
Hi Ranju, Can you show the pods and their state? Does the situation happen at the very beginning of spark-submit or some time later (once you've got a couple of executors)? My understanding allows me to think of the driver not starting up due to lack of resources or executors. In either case they're not deleted as they simply wait forever. I might be mistaken here though. What property is this for "this timeout of 60 sec."? Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Wed, Feb 10, 2021 at 2:03 PM Ranju Jain wrote: > Hi, > > > > I submitted the spark job and pods goes in Pending state because of > insufficient resources. > > But they are not getting deleted after this timeout of 60 sec. Please help > me in understanding. > > > > Regards > > Ranju >
Re: Spark structured streaming - efficient way to do lots of aggregations on the same input files
Hi Filip, Care to share the code behind "The only thing I found so far involves using forEachBatch and manually updating my aggregates. "? I'm not completely sure I understand your use case and hope the code could shed more light on it. Thank you. Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Thu, Jan 21, 2021 at 5:05 PM Filip wrote: > Hi, > > I'm considering using Apache Spark for the development of an application. > This would replace a legacy program which reads CSV files and does lots > (tens/hundreds) of aggregations on them. The aggregations are fairly > simple: > counts, sums, etc. while applying some filtering conditions on some of the > columns. > > I prefer using structured streaming for its simplicity and low-latency. I'd > also like to use full SQL queries (via createOrReplaceTempView). However, > doing multiple queries means Spark will re-read the input files for each > one > of them. This seems very inefficient for my use-case. > > Does anyone have any suggestions? The only thing I found so far involves > using forEachBatch and manually updating my aggregates. But, I think there > should be a simpler solution for this use case. > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: Column-level encryption in Spark SQL
Hi, Never heard of it (and have once been tasked to explore a similar use case). I'm curious how you'd like it to work? (no idea how Hive does this either) Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Sat, Dec 19, 2020 at 2:38 AM john washington wrote: > Dear Spark team members, > > Can you please advise if Column-level encryption is available in Spark SQL? > I am aware that HIVE supports column level encryption. > > Appreciate your response. > > Thanks, > John >
Re: Process each kafka record for structured streaming
Hi, Can you use console sink and make sure that the pipeline shows some progress? Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Wed, Jan 20, 2021 at 10:44 AM rajat kumar wrote: > Hi, > > I want to apply custom logic for each row of data I am getting through > kafka and want to do it with microbatch. > When I am running it , it is not progressing. > > > kafka_stream_df \ > .writeStream \ > .foreach(process_records) \ > .outputMode("append") \ > .option("checkpointLocation", "checkpt") \ > .trigger(continuous="5 seconds").start() > > Regards > > Rajat > > >
Re: Application Timeout
Hi Brett, No idea why it happens, but got curious about this "Cores" column being 0. Is this always the case? Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Tue, Jan 19, 2021 at 11:27 PM Brett Spark wrote: > Hello! > When using Spark Standalone & Spark 2.4.4 / 3.0.0 - we are seeing our > standalone Spark "applications" timeout and show as "Finished" after around > an hour of time. > > Here is a screenshot from the Spark master before it's marked as finished. > [image: image.png] > Here is a screenshot from the Spark master after it's marked as finished. > (After over an hour of idle time). > [image: image.png] > Here are the logs from the Spark Master / Worker: > > spark-master-2d733568b2a7e82de7b2b09b6daa17e9-7cd4cfcddb-f84q7 master > 2021-01-19 21:55:47,282 INFO master.Master: 172.32.3.66:34570 got > disassociated, removing it. > spark-master-2d733568b2a7e82de7b2b09b6daa17e9-7cd4cfcddb-f84q7 master > 2021-01-19 21:55:52,095 INFO master.Master: 172.32.115.115:36556 got > disassociated, removing it. > spark-master-2d733568b2a7e82de7b2b09b6daa17e9-7cd4cfcddb-f84q7 master > 2021-01-19 21:55:52,095 INFO master.Master: 172.32.115.115:37305 got > disassociated, removing it. > spark-master-2d733568b2a7e82de7b2b09b6daa17e9-7cd4cfcddb-f84q7 master > 2021-01-19 21:55:52,096 INFO master.Master: Removing app > app-20210119204911- > spark-worker-2d733568b2a7e82de7b2b09b6daa17e9-7bbb75f9b6-8mv2b worker > 2021-01-19 21:55:52,112 INFO shuffle.ExternalShuffleBlockResolver: > Application app-20210119204911- removed, cleanupLocalDirs = true > > Is there a setting that causes an application to timeout after an hour of > a Spark application or Spark worker being idle? > > I would like to keep our Spark applications alive as long as possible. > > I haven't been able to find a setting in the Spark confs documentation > that corresponds to this so i'm wondering if this is something that's hard > coded. > > Please let me know, > Thank you! >
Re: Only one Active task in Spark Structured Streaming application
Hi, I'd look at stages and jobs as it's possible that the only task running is the missing one in a stage of a job. Just guessing... Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Thu, Jan 21, 2021 at 12:19 PM Eric Beabes wrote: > Hello, > > My Spark Structured Streaming application was performing well for quite > some time but all of a sudden from today it has slowed down. I noticed in > the Spark UI that the 'No. of Active Tasks' is 1 even though 64 Cores are > available. (Please see the attached image). > > I don't believe there's any data skew issue related to partitioning of > data. What could be the reason for this? Please advise. Thanks. > > > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Query on entrypoint.sh Kubernetes spark
Hi, I'm a beginner in Spark on Kubernetes so bear with me and watch out for possible mistakes :) The key to understand entrypoint.sh and -deploy-mode client is to think about the environment where the script is executed in. That's k8s already where the Docker image is brought to life as a container of a driver pod. There's no point using cluster deploy mode...ever. Makes sense? Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Thu, Jan 21, 2021 at 10:40 AM Sachit Murarka wrote: > Hi All, > > To run spark on kubernetes . I see following lines in entrypoint.sh script > available > > case "$1" in > driver) > shift 1 > CMD=( > "$SPARK_HOME/bin/spark-submit" > --conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS" > --deploy-mode client > > Could you pls suggest Why deploy-mode client is mentioned in entrypoint.sh > ? > I am running spark submit using deploy mode cluster but inside > entrypoint.sh which it is mentioned like that. > > > Kind Regards, > Sachit Murarka >
Re: RDD filter in for loop gave strange results
Hi Marco, A Scala dev here. In short: yet another reason against Python :) Honestly, I've got no idea why the code gives the output. Ran it with 3.1.1-rc1 and got the very same results. Hoping pyspark/python devs will chime in and shed more light on this. Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Wed, Jan 20, 2021 at 2:07 PM Marco Wong wrote: > Dear Spark users, > > I ran the Python code below on a simple RDD, but it gave strange results. > The filtered RDD contains non-existent elements which were filtered away > earlier. Any idea why this happened? > ``` > rdd = spark.sparkContext.parallelize([0,1,2]) > for i in range(3): > print("RDD is ", rdd.collect()) > print("Filtered RDD is ", rdd.filter(lambda x:x!=i).collect()) > rdd = rdd.filter(lambda x:x!=i) > print("Result is ", rdd.collect()) > print() > ``` > which gave > ``` > RDD is [0, 1, 2] > Filtered RDD is [1, 2] > Result is [1, 2] > > RDD is [1, 2] > Filtered RDD is [0, 2] > Result is [0, 2] > > RDD is [0, 2] > Filtered RDD is [0, 1] > Result is [0, 1] > ``` > > Thanks, > > Marco >
Re: Spark RDD + HBase: adoption trend
Hi Marco, IMHO RDD is only for very sophisticated use cases that very few Spark devs would be capable of. I consider RDD API a sort of Spark assembler and most Spark devs should stick to Dataset API. Speaking of HBase, see https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/bigtable/spark where you can find a demo that I worked on last year and made sure that: "Apache HBase™ Spark Connector implements the DataSource API for Apache HBase and allows executing relational queries on data stored in Cloud Bigtable." That makes hbase-rdd even more obsolete but not necessarily unusable (I am little skilled in the HBase space to comment on this). I think you should consider merging the project hbase-rdd of yours with the official Apache HBase™ Spark Connector at https://github.com/apache/hbase-connectors/tree/master/spark (as they seem to lack active development IMHO). Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Wed, Jan 20, 2021 at 2:44 PM Marco Firrincieli wrote: > Hi, my name is Marco and I'm one of the developers behind > https://github.com/unicredit/hbase-rdd > a project we are currently reviewing for various reasons. > > We were basically wondering if RDD "is still a thing" nowadays (we see > lots of usage for DataFrames or Datasets) and we're not sure how much of > the community still works/uses RDDs. > > Also, for lack of time, we always mainly worked using Cloudera-flavored > Hadoop/HBase & Spark versions. We were thinking the community would then > help us organize the project in a more "generic" way, but that didn't > happen. > > So I figured I would ask here what is the gut feeling of the Spark > community so to better define the future of our little library. > > Thanks > > -Marco > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: Spark Event Log Forwarding and Offset Tracking
Hi, > Forwarding Spark Event Logs to identify critical events like job start, executor failures, job failures etc to ElasticSearch via log4j. However I could not find any way to foward event log via log4j configurations. Is there any other recommended approach to track these application events? I'd use SparkListener API ( http://spark.apache.org/docs/latest/api/scala/org/apache/spark/scheduler/SparkListener.html ) > 2 - For Spark streaming jobs, is there any way to identify that data from Kafka is not consumed for whatever reason, or the offsets are not progressing as expected and also forward that to ElasticSearch via log4j for monitoring Think SparkListener API would help here too. Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Wed, Jan 13, 2021 at 5:15 PM raymond.tan wrote: > Hello here, I am new to spark and am trying to add some monitoring for > spark applications specifically to handle the below situations - 1 - > Forwarding Spark Event Logs to identify critical events like job start, > executor failures, job failures etc to ElasticSearch via log4j. However I > could not find any way to foward event log via log4j configurations. Is > there any other recommended approach to track these application events? 2 - > For Spark streaming jobs, is there any way to identify that data from Kafka > is not consumed for whatever reason, or the offsets are not progressing as > expected and also forward that to ElasticSearch via log4j for monitoring > Thanks, Raymond > -- > Sent from the Apache Spark User List mailing list archive > <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com. >
Re: understanding spark shuffle file re-use better
Hi, An interesting question that I must admit I'm not sure how to answer myself actually :) Off the top of my head, I'd **guess** unless you cache the first query these two queries would share nothing. With caching, there's a phase in query execution when a canonicalized version of a query is used to look up any cached queries. Again, I'm not really sure and if I'd have to answer it (e.g. as part of an interview) I'd say nothing would be shared / re-used. Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Wed, Jan 13, 2021 at 5:39 PM Koert Kuipers wrote: > is shuffle file re-use based on identity or equality of the dataframe? > > for example if run the exact same code twice to load data and do > transforms (joins, aggregations, etc.) but without re-using any actual > dataframes, will i still see skipped stages thanks to shuffle file re-use? > > thanks! > koert >
Re: Dynamic Spark metrics creation
Hey Yurii, > which is unavailable from executors. Register it on the driver and use accumulators on executors to update the values (on the driver)? Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Sat, Jan 16, 2021 at 2:21 PM Yuri Oleynikov (יורי אולייניקוב < yur...@gmail.com> wrote: > Hi all, > I have a spark application with Arbitrary Stateful Aggregation implemented > with FlatMapGroupsWithStateFunction. > > I want to make some statistics about incoming events inside > FlatMapGroupsWithStateFunction. > The statistics are made from some event property which on the one hand has > dynamic values but on the other hand - small finite set (thought unknown) > of values (e.g. country name). > > So I thought to register dynamic metrics inside > FlatMapGroupsWithStateFunction but as far as I understand, this requires > accessing MetricsSystem via SparkEnv.get() which is unavailable from > executors. > > Any thoughts/suggestions? > > With best regards, > Yurii > >
Re: Insertable records in Datasource v2.
Hi Rahul, I think it's not and will not be supported. You should report an issue in JIRA at https://issues.apache.org/jira/projects/SPARK, but don't expect a solution with temporary views though. I did manage to reproduce the AnalysisException: unresolved operator and even though INSERT INTO views is not allowed (according to ResolveRelations) that did not get triggered. Just like you I thought I could get past this exception with InsertableRelation, but alas it didn't work either. The temporary view based on the custom data source is indeed resolved, but InsertIntoTable logical operator did not that leads to the exception you've been facing: unresolved operator 'InsertIntoTable RelationV2 custom[id#0L, name#1, v#2] (Options: [paths=[]]), false, false;; 'InsertIntoTable RelationV2 custom[id#0L, name#1, v#2] (Options: [paths=[]]), false, false +- Project [_1#56 AS id#60, _2#57 AS name#61, _3#58 AS v#62] +- LocalRelation [_1#56, _2#57, _3#58] By the way, this insert could also be expressed using Scala API as follows: Seq((2, "insert_record1", 200), (20001, "insert_record2", 201)) .toDF(insertDFWithSchema.schema.names: _*) .write .insertInto(sqlView) In summary, you should report this to JIRA, but don't expect this get fixed other than to catch this case just to throw this exception from ResolveRelations: Inserting into a view is not allowed" Unless I'm mistaken... Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Thu, Jan 14, 2021 at 3:52 AM Rahul Kumar wrote: > I'm implementing V2 datasource for a custom datasource. > > I'm trying to insert a record into a temp view, in following fashion. > > insertDFWithSchema.createOrReplaceTempView(sqlView) > spark.sql(s”insert into $sqlView values (2, ‘insert_record1’, 200, > 23000), (20001, ‘insert_record2’, 201, 23001)“). where insertDFWithSchema > is > some dataframe loaded from custom data source. > > > I end up getting following exception > > *org.apache.spark.sql.AnalysisException: unresolved operator > 'InsertIntoTable RelationV2* mydb[id#63, name#64, age#65, salary#66] > (Options: > > [mydb.updateByKey=id,mydb.namespace=test,paths=[],mydb.set=input_data,mydb.se...), > false, false;; > 'InsertIntoTable RelationV2 mydb[id#63, name#64, age#65, salary#66] > (Options: > > [mydb.updateByKey=id,mydb.namespace=test,paths=[],mydb.set=input_data,mydb.se...), > false, false > +- LocalRelation [col1#88, col2#89, col3#90, col4#91] > at > > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:43) > at > > org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:95) > at > > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$3.apply(CheckAnalysis.scala:431) > at > > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$3.apply(CheckAnalysis.scala:430) > at > org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) > at > > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:430) > at > > org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:95) > at > > org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:108) > at > > org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:105) > at > > org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201) > at > > org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105) > at > > org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:58) > at > > org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:56) > at > > org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:48) > at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:78) > at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:643) > ... 40 elided > > > In V1 datasource implementation, I had insertable trait in BaseRelation. In > v2, I'm not sure how it could be achieved. I have also tried implementing > insertable trait in DefaultSource. Any input would be extremely helpful. > > Thanks, > Rahul > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: Data source v2 streaming sinks does not support Update mode
Hi, Can you post the whole message? I'm trying to find what might be causing it. A small reproducible example would be of help too. Thank you. Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes wrote: > Trying to port my Spark 2.4 based (Structured) streaming application to > Spark 3.0. I compiled it using the dependency given below: > > > org.apache.spark > spark-sql-kafka-0-10_${scala.binary.version} > 3.1.0 > > > > Every time I run it under Spark 3.0, I get this message: *Data source v2 > streaming sinks does not support Update mode* > > I am using '*mapGroupsWithState*' so as per this link ( > https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes), > the only supported Output mode is "*Update*". > > My Sink is a Kafka topic so I am using this: > > .writeStream > .format("kafka") > > > What am I missing? > > >
Re: Converting spark batch to spark streaming
Hi, Start with DataStreamWriter.foreachBatch. Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Thu, Jan 7, 2021 at 6:55 PM mhd wrk wrote: > I'm trying to convert a spark batch application to a streaming application > and wondering what function (or design pattern) I should use to execute a > series of operations inside the driver upon arrival of each message (a text > file inside an HDFS folder) before starting computation inside executors. > > Thanks, > Mohammad >
Re: Impact of .localCheckpoint() and executor dying
Hi, > impact of an executor dying after a localCheckpoint is taken. My memory is a bit vague on this, but I'd not be surprised if this localCheckpoint-ed RDD would be "broken" and any actions would simply throw an exception like missing partitions or similar. There's no way back. I wish myself that someone with more skills in this area chimed in... Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Wed, Jan 6, 2021 at 8:30 PM Brett Larson wrote: > Jacek, > Thanks for your response, I am still trying to understand the impact of an > executor dying after a localCheckpoint is taken. > > Would the entire spark application fail in this case due to the broken > lineage? Or would the jobs associated with that executor need to be > re-computed from scratch? > > Thank you! > > > On Wed, Jan 6, 2021 at 1:09 PM Jacek Laskowski wrote: > >> Hi, >> >> > My understanding is that .localCheckpoint() breaks the lineage of the >> RDD >> >> True. >> >> > and this requires that the entire RDD to be rebuild instead of being >> able to recompute lost partitions. >> >> In a sense, it's as if you saved the partitions to executors and re-read >> them back as source data (for this checkpointed RDD). >> >> > Does each executor store a copy of the entire RDD? >> >> No. An executor has got only the data of the partitions (for the tasks >> this executor has executed). >> >> > Checkpoint over .localCheckpoint. >> >> checkpoint is similar to localCheckpoint, but slower and reliable (as >> it's on a stable HDFS file system not on an ephemeral executor). In either >> case, the lineage should be the same = cut. >> >> Pozdrawiam, >> Jacek Laskowski >> >> https://about.me/JacekLaskowski >> "The Internals Of" Online Books <https://books.japila.pl/> >> Follow me on https://twitter.com/jaceklaskowski >> >> <https://twitter.com/jaceklaskowski> >> >> >> On Wed, Jan 6, 2021 at 6:15 PM brettplarson >> wrote: >> >>> Hello, >>> I am wondering what the impact of using .localCheckpoint() and having the >>> executor die would be? >>> >>> My understanding is that .localCheckpoint() breaks the lineage of the RDD >>> and this requires that the entire RDD to be rebuild instead of being >>> able to >>> recompute lost partitions. >>> >>> Does each executor store a copy of the entire RDD? >>> >>> It's unclear to me the benefit of using Checkpoint over >>> .localCheckpoint. (I >>> am aware that this is HDFS backed, but it's unclear the implications of >>> this) >>> >>> Please let me know, >>> Thank you! >>> >>> >>> >>> >>> -- >>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ >>> >>> - >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>> >>> > > -- > *Brett Larson * > brettpatricklar...@gmail.com / 847321200 >
Re: Impact of .localCheckpoint() and executor dying
Hi, > My understanding is that .localCheckpoint() breaks the lineage of the RDD True. > and this requires that the entire RDD to be rebuild instead of being able to recompute lost partitions. In a sense, it's as if you saved the partitions to executors and re-read them back as source data (for this checkpointed RDD). > Does each executor store a copy of the entire RDD? No. An executor has got only the data of the partitions (for the tasks this executor has executed). > Checkpoint over .localCheckpoint. checkpoint is similar to localCheckpoint, but slower and reliable (as it's on a stable HDFS file system not on an ephemeral executor). In either case, the lineage should be the same = cut. Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Wed, Jan 6, 2021 at 6:15 PM brettplarson wrote: > Hello, > I am wondering what the impact of using .localCheckpoint() and having the > executor die would be? > > My understanding is that .localCheckpoint() breaks the lineage of the RDD > and this requires that the entire RDD to be rebuild instead of being able > to > recompute lost partitions. > > Does each executor store a copy of the entire RDD? > > It's unclear to me the benefit of using Checkpoint over .localCheckpoint. > (I > am aware that this is HDFS backed, but it's unclear the implications of > this) > > Please let me know, > Thank you! > > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: Is Spark Structured Streaming TOTALLY BROKEN (Spark Metadata Issues)
Hi Rachana, > Should I go backward and use Spark Streaming DStream based. No. Never. It's no longer supported (and should really be removed from the codebase once and for all - dreaming...). Spark focuses on Spark SQL and Spark Structured Streaming as user-facing modules for batch and streaming queries, respectively. Please note that I'm not a PMC member or even a committer so I'm speaking for myself only (not representing the project in an official way). Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Thu, Jun 18, 2020 at 12:03 AM Rachana Srivastava wrote: > *Structured Stream Vs Spark Steaming (DStream)?* > > Which is recommended for system stability. Exactly once is NOT first > priority. First priority is STABLE system. > > I am I need to make a decision soon. I need help. Here is the question > again. Should I go backward and use Spark Streaming DStream based. Write > our own checkpoint and go from there. At least we never encounter these > metadata issues there. > > Thanks, > > Rachana > > On Wednesday, June 17, 2020, 02:02:20 PM PDT, Jungtaek Lim < > kabhwan.opensou...@gmail.com> wrote: > > > Just in case if anyone prefers ASF projects then there are other > alternative projects in ASF as well, alphabetically, Apache Hudi [1] and > Apache Iceberg [2]. Both are recently graduated as top level projects. > (DISCLAIMER: I'm not involved in both.) > > BTW it would be nice if we make the metadata implementation on file stream > source/sink be pluggable - from what I've seen, plugin approach has been > selected as the way to go whenever some part is going to be complicated and > it becomes arguable whether the part should be handled in Spark project vs > should be outside. e.g. checkpoint manager, state store provider, etc. It > would open up chances for the ecosystem to play with the challenge "without > completely re-writing the file stream source and sink", focusing on > scalability for metadata in a long run query. Alternative projects > described above will still provide more higher-level features and > look attractive, but sometimes it may be just "using a sledgehammer to > crack a nut". > > 1. https://hudi.apache.org/ > 2. https://iceberg.apache.org/ > > > On Thu, Jun 18, 2020 at 2:34 AM Tathagata Das > wrote: > > Hello Rachana, > > Getting exactly-once semantics on files and making it scale to a very > large number of files are very hard problems to solve. While Structured > Streaming + built-in file sink solves the exactly-once guarantee that > DStreams could not, it is definitely limited in other ways (scaling in > terms of files, combining batch and streaming writes in the same place, > etc). And solving this problem requires a holistic solution that is > arguably beyond the scope of the Spark project. > > There are other projects that are trying to solve this file management > issue. For example, Delta Lake <https://delta.io/>(full disclosure, I am > involved in it) was built to exactly solve this problem - get exactly-once > and ACID guarantees on files, but also scale to handling millions of files. > Please consider it as part of your solution. > > > > > On Wed, Jun 17, 2020 at 9:50 AM Rachana Srivastava > wrote: > > I have written a simple spark structured steaming app to move data from > Kafka to S3. Found that in order to support exactly-once guarantee spark > creates _spark_metadata folder, which ends up growing too large as the > streaming app is SUPPOSE TO run FOREVER. But when the streaming app runs > for a long time the metadata folder grows so big that we start getting OOM > errors. Only way to resolve OOM is delete Checkpoint and Metadata folder > and loose VALUABLE customer data. > > Spark open JIRAs SPARK-24295 and SPARK-29995, SPARK-30462, and SPARK-24295) > Since Spark Streaming was NOT broken like this. Is Spark Streaming a > BETTER choice? > >
Re: BOOK review of Spark: WARNING to spark users
Hi Emma, I'm curious about the purpose of the email. Mind elaborating? Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Wed, May 20, 2020 at 10:43 PM emma davis wrote: > > *Book:* Machine Learning with Apache Spark Quick Start Guide > *publisher* : packt> > > > *F**ollow**ing* this Getting Started with Python in VS Code > https://code.visualstudio.com/docs/python/python-tutorial > > I realised Jillur Qudus has written and published a book without any > knowledge > of subject matter, amongst other things Python. > > > > *Highlighted proof with further details further down the email. * > > import findspark # these lines of code are unnecessary see link above for > setup > findspark.init() > > Setting SPARK_HOME or any other spark variables are unnecessary because > Spark like any > frameworks is self contained and has its own conf directory for startup > persistent > configuration settings. > Obviously the software would find its own current directory upon starting > i.e. sbin/start-master.sh > > Spark is a BIG DATA tool ( heavy distributed ,parallelism processing) so > clearly you would expect its hello world demo programs to demonstrate > that. > > what is the point of setting num_samples=100. something like 10**10 would > make sense to test performance. > > > > *This is my warning do not end up wasting your valuable time as I did . I > fee your time is valuable. * > *I realise the scam as I got a better understanding of the product by just > doing the correct hello world program from correct source. * > > “Research by CISQ found that, in 2018, poor quality software cost > organizations $2.8 trillion in the US alone. “ > > I attribute this to the Indian IT industry claiming they can do job better > than the natives [US , Europeans.] Implying Indian Education or IT people > is superior. For example People like me born, live and educated in the > western Europe > > *https://www.it-cisq.org/the-cost-of-poor-quality-software-in-the-us-a-2018-report/The-Cost-of-Poor-Quality-Software-in-the-US-2018-Report.pdf > <https://www.it-cisq.org/the-cost-of-poor-quality-software-in-the-us-a-2018-report/The-Cost-of-Poor-Quality-Software-in-the-US-2018-Report.pdf>* > > > *Contributors: About the Author* > “*Jillur Qudus* is a lead technical architect, polygot software engineer > and data scientist > with over 10 years of hand-on experience in architecting and engineering > distributed, > scalable , high performance .. to combat serious organised crime. Jillur > has extensive experience working with government, intelligence,law > enforcement and banking, and has worked across the world including > Japan,Singapore,Malysia,Hong Kong and New Zealand .. founder of keisan, a > UK-based company specializing in open source distributed technologies and > machine learning…“ > This obviously means a lot to many but when I look at his work Judge for > yourself based on evidence. > > *Page 54* > * ”* > Additional Python Packages > > conda install -c conda-forge findspark > > conda install -c conda-forge pykafka > ...”** > > The remainder of the program was copied from spark website so that wasn’t > wrong. > *Page 63* > > * “* > > cd *etc*/profile.d > vi spark.sh > $ export SPARK_HOME=/opt/spark-2.3.2-bin-hadoop2.7 > > source spark.sh > > .. in order for the SPARK_HOME environment variable to be successfully > recognized and registered by findspark ... > …. > > We are now ready to write out first spark application in Python ! ….. > > # (1) import required Python dependencies > import findspark > findspark.init() > > (3) > …. > num_samples = 100 *“ *** > > > emma davis > emma.davi...@aol.com > >
Re: Spark Window Documentation
Hi Neeraj, I'd start from "Contributing Documentation Changes" in https://spark.apache.org/contributing.html Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Fri, May 8, 2020 at 10:37 PM neeraj bhadani wrote: > Thanks Jacek for sharing the details. I could see some example here > https://github.com/apache/spark/blob/master/python/pyspark/sql/window.py#L83 > as > mentioned in original email but not sure where this is reflecting on spark > documentation. Also, what would be the process to contribute to the spark > docs. I check the section "Contributing Documentation Changes" at this > link : h <https://spark.apache.org/contributing.html> > ttps://spark.apache.org/contributing.html > <https://spark.apache.org/contributing.html> but couldn't find a way to > contribute. I might be missing something here. > > If someone can help on how to contribute to the spark docs would be great. > > Regards, > Neeraj > > > On Fri, May 8, 2020 at 12:39 PM Jacek Laskowski wrote: > >> Hi Neeraj, >> >> I'm not a committer so I might be wrong, but there is no "blessed way" to >> include examples. >> >> There are some examples in the official documentation at >> http://spark.apache.org/docs/latest/sql-programming-guide.html but this >> is how to use the general concepts not specific operators. >> >> There are some examples at http://spark.apache.org/examples.html >> >> I think the best way would be to include examples as close to the methods >> as possible and scaladoc/javadoc would be best IMHO. >> >> p.s. Just yesterday there was this thread "What open source projects have >> the best docs?" on twitter @ >> https://twitter.com/adamwathan/status/1257641015835611138. You could >> borrow some ideas of the docs that are claimed "the best". >> >> Pozdrawiam, >> Jacek Laskowski >> >> https://about.me/JacekLaskowski >> "The Internals Of" Online Books <https://books.japila.pl/> >> Follow me on https://twitter.com/jaceklaskowski >> >> <https://twitter.com/jaceklaskowski> >> >> >> On Fri, May 8, 2020 at 11:34 AM neeraj bhadani < >> bhadani.neeraj...@gmail.com> wrote: >> >>> Hi Team, >>> I was looking for a Spark window function example on documentation. >>> >>> For example, I could the function definition and params are explained >>> nicely here: >>> https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Window.rowsBetween >>> >>> and this is the source which is available since spark version 2.1: >>> https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/window.html#Window.rowsBetween >>> >>> But I couldn't find an example which helps to understand How it works. >>> >>> Although, while browsing the GitHub code I have found some example here: >>> https://github.com/apache/spark/blob/master/python/pyspark/sql/window.py#L83 >>> >>> which I couldn't find on the spark official doc page. Where and how this >>> example is linked with the official spark documentation. >>> >>> If such examples are not available, Could you please share the process >>> on how I can contribute examples to the spark documentation. >>> >>> Regards, >>> Neeraj >>> >>
Re: java.lang.OutOfMemoryError Spark Worker
Hi, It's been a while since I worked with Spark Standalone, but I'd check the logs of the workers. How do you spark-submit the app? DId you check /grid/1/spark/work/driver-20200508153502-1291 directory? Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Fri, May 8, 2020 at 2:32 PM Hrishikesh Mishra wrote: > Thanks Jacek for quick response. > Due to our system constraints, we can't move to Structured Streaming now. > But definitely YARN can be tried out. > > But my problem is I'm able to figure out where is the issue, Driver, > Executor, or Worker. Even exceptions are clueless. Please see the below > exception, I'm unable to spot the issue for OOM. > > 20/05/08 15:36:55 INFO Worker: Asked to kill driver > driver-20200508153502-1291 > > 20/05/08 15:36:55 INFO DriverRunner: Killing driver process! > > 20/05/08 15:36:55 INFO CommandUtils: Redirection to > /grid/1/spark/work/driver-20200508153502-1291/stderr closed: Stream closed > > 20/05/08 15:36:55 INFO CommandUtils: Redirection to > /grid/1/spark/work/driver-20200508153502-1291/stdout closed: Stream closed > > 20/05/08 15:36:55 INFO ExternalShuffleBlockResolver: Application > app-20200508153654-11776 removed, cleanupLocalDirs = true > > 20/05/08 15:36:55 INFO Worker: Driver driver-20200508153502-1291 was > killed by user > > *20/05/08 15:43:06 WARN AbstractChannelHandlerContext: An exception > 'java.lang.OutOfMemoryError: Java heap space' [enable DEBUG level for full > stacktrace] was thrown by a user handler's exceptionCaught() method while > handling the following exception:* > > *java.lang.OutOfMemoryError: Java heap space* > > *20/05/08 15:43:23 ERROR SparkUncaughtExceptionHandler: Uncaught exception > in thread Thread[dispatcher-event-loop-6,5,main]* > > *java.lang.OutOfMemoryError: Java heap space* > > *20/05/08 15:43:17 WARN AbstractChannelHandlerContext: An exception > 'java.lang.OutOfMemoryError: Java heap space' [enable DEBUG level for full > stacktrace] was thrown by a user handler's exceptionCaught() method while > handling the following exception:* > > *java.lang.OutOfMemoryError: Java heap space* > > 20/05/08 15:43:33 INFO ExecutorRunner: Killing process! > > 20/05/08 15:43:33 INFO ExecutorRunner: Killing process! > > 20/05/08 15:43:33 INFO ExecutorRunner: Killing process! > > 20/05/08 15:43:33 INFO ShutdownHookManager: Shutdown hook called > > 20/05/08 15:43:33 INFO ShutdownHookManager: Deleting directory > /grid/1/spark/local/spark-e045e069-e126-4cff-9512-d36ad30ee922 > > > > > On Fri, May 8, 2020 at 5:14 PM Jacek Laskowski wrote: > >> Hi, >> >> Sorry for being perhaps too harsh, but when you asked "Am I missing >> something. " and I noticed this "Kafka Direct Stream" and "Spark Standalone >> Cluster. " I immediately thought "Yeah...please upgrade your Spark env to >> use Spark Structured Streaming at the very least and/or use YARN as the >> cluster manager". >> >> Another thought was that the user code (your code) could be leaking >> resources so Spark eventually reports heap-related errors that may not >> necessarily be Spark's. >> >> Pozdrawiam, >> Jacek Laskowski >> >> https://about.me/JacekLaskowski >> "The Internals Of" Online Books <https://books.japila.pl/> >> Follow me on https://twitter.com/jaceklaskowski >> >> <https://twitter.com/jaceklaskowski> >> >> >> On Thu, May 7, 2020 at 1:12 PM Hrishikesh Mishra >> wrote: >> >>> Hi >>> >>> I am getting out of memory error in worker log in streaming jobs in >>> every couple of hours. After this worker dies. There is no shuffle, no >>> aggression, no. caching in job, its just a transformation. >>> I'm not able to identify where is the problem, driver or executor. And >>> why worker getting dead after the OOM streaming job should die. Am I >>> missing something. >>> >>> Driver Memory: 2g >>> Executor memory: 4g >>> >>> Spark Version: 2.4 >>> Kafka Direct Stream >>> Spark Standalone Cluster. >>> >>> >>> 20/05/06 12:52:20 INFO SecurityManager: SecurityManager: authentication >>> disabled; ui acls disabled; users with view permissions: Set(root); groups >>> with view permissions: Set(); users with modify permissions: Set(root); >>> groups with modify permissions: Set() >>> >>> 20/05/06 12:53:03 ERROR Spark
Re: java.lang.OutOfMemoryError Spark Worker
Hi, Sorry for being perhaps too harsh, but when you asked "Am I missing something. " and I noticed this "Kafka Direct Stream" and "Spark Standalone Cluster. " I immediately thought "Yeah...please upgrade your Spark env to use Spark Structured Streaming at the very least and/or use YARN as the cluster manager". Another thought was that the user code (your code) could be leaking resources so Spark eventually reports heap-related errors that may not necessarily be Spark's. Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Thu, May 7, 2020 at 1:12 PM Hrishikesh Mishra wrote: > Hi > > I am getting out of memory error in worker log in streaming jobs in every > couple of hours. After this worker dies. There is no shuffle, no > aggression, no. caching in job, its just a transformation. > I'm not able to identify where is the problem, driver or executor. And why > worker getting dead after the OOM streaming job should die. Am I missing > something. > > Driver Memory: 2g > Executor memory: 4g > > Spark Version: 2.4 > Kafka Direct Stream > Spark Standalone Cluster. > > > 20/05/06 12:52:20 INFO SecurityManager: SecurityManager: authentication > disabled; ui acls disabled; users with view permissions: Set(root); groups > with view permissions: Set(); users with modify permissions: Set(root); > groups with modify permissions: Set() > > 20/05/06 12:53:03 ERROR SparkUncaughtExceptionHandler: Uncaught exception > in thread Thread[ExecutorRunner for app-20200506124717-10226/0,5,main] > > java.lang.OutOfMemoryError: Java heap space > > at org.apache.xerces.util.XMLStringBuffer.append(Unknown Source) > > at org.apache.xerces.impl.XMLEntityScanner.scanData(Unknown Source) > > at org.apache.xerces.impl.XMLScanner.scanComment(Unknown Source) > > at > org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.scanComment(Unknown > Source) > > at > org.apache.xerces.impl.XMLDocumentFragmentScannerImpl$FragmentContentDispatcher.dispatch(Unknown > Source) > > at > org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.scanDocument(Unknown > Source) > > at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source) > > at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source) > > at org.apache.xerces.parsers.XMLParser.parse(Unknown Source) > > at org.apache.xerces.parsers.DOMParser.parse(Unknown Source) > > at org.apache.xerces.jaxp.DocumentBuilderImpl.parse(Unknown Source) > > at javax.xml.parsers.DocumentBuilder.parse(DocumentBuilder.java:150) > > at org.apache.hadoop.conf.Configuration.parse(Configuration.java:2480) > > at org.apache.hadoop.conf.Configuration.parse(Configuration.java:2468) > > at > org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2539) > > at > org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2492) > > at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2405) > > at org.apache.hadoop.conf.Configuration.set(Configuration.java:1143) > > at org.apache.hadoop.conf.Configuration.set(Configuration.java:1115) > > at > org.apache.spark.deploy.SparkHadoopUtil$.org$apache$spark$deploy$SparkHadoopUtil$$appendS3AndSparkHadoopConfigurations(SparkHadoopUtil.scala:464) > > at > org.apache.spark.deploy.SparkHadoopUtil$.newConfiguration(SparkHadoopUtil.scala:436) > > at > org.apache.spark.deploy.SparkHadoopUtil.newConfiguration(SparkHadoopUtil.scala:114) > > at org.apache.spark.SecurityManager.(SecurityManager.scala:114) > > at org.apache.spark.deploy.worker.ExecutorRunner.org > $apache$spark$deploy$worker$ExecutorRunner$$fetchAndRunExecutor(ExecutorRunner.scala:149) > > at > org.apache.spark.deploy.worker.ExecutorRunner$$anon$1.run(ExecutorRunner.scala:73) > > 20/05/06 12:53:38 INFO DriverRunner: Worker shutting down, killing driver > driver-20200505181719-1187 > > 20/05/06 12:53:38 INFO DriverRunner: Killing driver process! > > > > > Regards > Hrishi >
Re: Spark Window Documentation
Hi Neeraj, I'm not a committer so I might be wrong, but there is no "blessed way" to include examples. There are some examples in the official documentation at http://spark.apache.org/docs/latest/sql-programming-guide.html but this is how to use the general concepts not specific operators. There are some examples at http://spark.apache.org/examples.html I think the best way would be to include examples as close to the methods as possible and scaladoc/javadoc would be best IMHO. p.s. Just yesterday there was this thread "What open source projects have the best docs?" on twitter @ https://twitter.com/adamwathan/status/1257641015835611138. You could borrow some ideas of the docs that are claimed "the best". Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Fri, May 8, 2020 at 11:34 AM neeraj bhadani wrote: > Hi Team, > I was looking for a Spark window function example on documentation. > > For example, I could the function definition and params are explained > nicely here: > https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Window.rowsBetween > > and this is the source which is available since spark version 2.1: > https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/window.html#Window.rowsBetween > > But I couldn't find an example which helps to understand How it works. > > Although, while browsing the GitHub code I have found some example here: > https://github.com/apache/spark/blob/master/python/pyspark/sql/window.py#L83 > > which I couldn't find on the spark official doc page. Where and how this > example is linked with the official spark documentation. > > If such examples are not available, Could you please share the process on > how I can contribute examples to the spark documentation. > > Regards, > Neeraj >
Re: Release Apache Spark 2.4.4 before 3.0.0
Hi, Thanks Dongjoon Hyun for stepping up as a release manager! Much appreciated. If there's a volunteer to cut a release, I'm always to support it. In addition, the more frequent releases the better for end users so they have a choice to upgrade and have all the latest fixes or wait. It's their call not ours (when we'd keep them waiting). My big 2 yes'es for the release! Jacek On Tue, 9 Jul 2019, 18:15 Dongjoon Hyun, wrote: > Hi, All. > > Spark 2.4.3 was released two months ago (8th May). > > As of today (9th July), there exist 45 fixes in `branch-2.4` including the > following correctness or blocker issues. > > - SPARK-26038 Decimal toScalaBigInt/toJavaBigInteger not work for > decimals not fitting in long > - SPARK-26045 Error in the spark 2.4 release package with the > spark-avro_2.11 dependency > - SPARK-27798 from_avro can modify variables in other rows in local > mode > - SPARK-27907 HiveUDAF should return NULL in case of 0 rows > - SPARK-28157 Make SHS clear KVStore LogInfo for the blacklist entries > - SPARK-28308 CalendarInterval sub-second part should be padded before > parsing > > It would be great if we can have Spark 2.4.4 before we are going to get > busier for 3.0.0. > If it's okay, I'd like to volunteer for an 2.4.4 release manager to roll > it next Monday. (15th July). > How do you think about this? > > Bests, > Dongjoon. >
Re: Change parallelism number in Spark Streaming
Hi, I've got a talk "The internals of stateful stream processing in Spark Structured Streaming" at https://dataxday.fr/ today and am going to include the tool on the slides to thank you for the work. Thanks. Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski The Internals of Spark SQL https://bit.ly/spark-sql-internals The Internals of Spark Structured Streaming https://bit.ly/spark-structured-streaming The Internals of Apache Kafka https://bit.ly/apache-kafka-internals Follow me at https://twitter.com/jaceklaskowski On Thu, Jun 27, 2019 at 3:32 AM Jungtaek Lim wrote: > Glad to help, Jacek. > > I'm happy you're doing similar thing, which means it could be pretty > useful for others as well. Looks like it might be good enough to contribute > state source and sink. I'll sort out my code and submit a PR. > > Thanks, > Jungtaek Lim (HeartSaVioR) > > > On Thu, Jun 27, 2019 at 7:54 AM Jacek Laskowski wrote: > >> Hi Jungtaek, >> >> That's very helpful to have the state source. As a matter of fact I've >> just this week been working on a similar tool (!) and have been wondering >> how to recreate the schema of the state key and value. You've helped me a >> lot. Thanks. >> >> Jacek >> >> On Wed, 26 Jun 2019, 23:58 Jungtaek Lim, wrote: >> >>> Hi, >>> >>> you could consider state operator's partition numbers as "max >>> parallelism", as parallelism can be reduced via applying coalesce. It would >>> be effectively working similar as key groups. >>> >>> If you're also considering offline query, there's a tool to manipulate >>> state which enables reading and writing state in structured streaming, >>> achieving rescaling and schema evolution. >>> >>> https://github.com/HeartSaVioR/spark-state-tools >>> (DISCLAIMER: I'm an author of this tool.) >>> >>> Thanks, >>> Jungtaek Lim (HeartSaVioR) >>> >>> On Thu, Jun 27, 2019 at 4:48 AM Rong, Jialei >>> wrote: >>> >>>> Thank you for your quick reply! >>>> >>>> Is there any plan to improve this? >>>> >>>> I asked this question due to some investigation on comparing those >>>> state of art streaming systems, among which Flink and DataFlow allow >>>> changing parallelism number, and by my knowledge of Spark Streaming, it >>>> seems it is also able to do that: if some “key interval” concept is used, >>>> then state can somehow decoupled from partition number by consistent >>>> hashing. >>>> >>>> >>>> >>>> >>>> >>>> Regards >>>> >>>> Jialei >>>> >>>> >>>> >>>> *From: *Jacek Laskowski >>>> *Date: *Wednesday, June 26, 2019 at 11:00 AM >>>> *To: *"Rong, Jialei" >>>> *Cc: *"user @spark" >>>> *Subject: *Re: Change parallelism number in Spark Streaming >>>> >>>> >>>> >>>> Hi, >>>> >>>> >>>> >>>> It's not allowed to change the numer of partitions after your streaming >>>> query is started. >>>> >>>> >>>> >>>> The reason is exactly the number of state stores which is exactly the >>>> number of partitions (perhaps multiplied by the number of stateful >>>> operators). >>>> >>>> >>>> >>>> I think you'll even get a warning or an exception when you change it >>>> after restarting the query. >>>> >>>> >>>> >>>> The number of partitions is stored in a checkpoint location. >>>> >>>> >>>> >>>> Jacek >>>> >>>> >>>> >>>> On Wed, 26 Jun 2019, 19:30 Rong, Jialei, >>>> wrote: >>>> >>>> Hi Dear Spark Expert >>>> >>>> >>>> >>>> I’m curious about a question regarding Spark Streaming/Structured >>>> Streaming: whether it allows to change parallelism number(the default one >>>> or the one specified in particular operator) in a stream having stateful >>>> transform/operator? Whether this will cause my checkpointed state get >>>> messed up? >>>> >>>> >>>> >>>> >>>> >>>> Regards >>>> >>>> Jialei >>>> >>>> >>>> >>>> >>> >>> -- >>> Name : Jungtaek Lim >>> Blog : http://medium.com/@heartsavior >>> Twitter : http://twitter.com/heartsavior >>> LinkedIn : http://www.linkedin.com/in/heartsavior >>> >> > > -- > Name : Jungtaek Lim > Blog : http://medium.com/@heartsavior > Twitter : http://twitter.com/heartsavior > LinkedIn : http://www.linkedin.com/in/heartsavior >
Re: Change parallelism number in Spark Streaming
Hi Jungtaek, That's very helpful to have the state source. As a matter of fact I've just this week been working on a similar tool (!) and have been wondering how to recreate the schema of the state key and value. You've helped me a lot. Thanks. Jacek On Wed, 26 Jun 2019, 23:58 Jungtaek Lim, wrote: > Hi, > > you could consider state operator's partition numbers as "max > parallelism", as parallelism can be reduced via applying coalesce. It would > be effectively working similar as key groups. > > If you're also considering offline query, there's a tool to manipulate > state which enables reading and writing state in structured streaming, > achieving rescaling and schema evolution. > > https://github.com/HeartSaVioR/spark-state-tools > (DISCLAIMER: I'm an author of this tool.) > > Thanks, > Jungtaek Lim (HeartSaVioR) > > On Thu, Jun 27, 2019 at 4:48 AM Rong, Jialei > wrote: > >> Thank you for your quick reply! >> >> Is there any plan to improve this? >> >> I asked this question due to some investigation on comparing those state >> of art streaming systems, among which Flink and DataFlow allow changing >> parallelism number, and by my knowledge of Spark Streaming, it seems it is >> also able to do that: if some “key interval” concept is used, then state >> can somehow decoupled from partition number by consistent hashing. >> >> >> >> >> >> Regards >> >> Jialei >> >> >> >> *From: *Jacek Laskowski >> *Date: *Wednesday, June 26, 2019 at 11:00 AM >> *To: *"Rong, Jialei" >> *Cc: *"user @spark" >> *Subject: *Re: Change parallelism number in Spark Streaming >> >> >> >> Hi, >> >> >> >> It's not allowed to change the numer of partitions after your streaming >> query is started. >> >> >> >> The reason is exactly the number of state stores which is exactly the >> number of partitions (perhaps multiplied by the number of stateful >> operators). >> >> >> >> I think you'll even get a warning or an exception when you change it >> after restarting the query. >> >> >> >> The number of partitions is stored in a checkpoint location. >> >> >> >> Jacek >> >> >> >> On Wed, 26 Jun 2019, 19:30 Rong, Jialei, >> wrote: >> >> Hi Dear Spark Expert >> >> >> >> I’m curious about a question regarding Spark Streaming/Structured >> Streaming: whether it allows to change parallelism number(the default one >> or the one specified in particular operator) in a stream having stateful >> transform/operator? Whether this will cause my checkpointed state get >> messed up? >> >> >> >> >> >> Regards >> >> Jialei >> >> >> >> > > -- > Name : Jungtaek Lim > Blog : http://medium.com/@heartsavior > Twitter : http://twitter.com/heartsavior > LinkedIn : http://www.linkedin.com/in/heartsavior >
Re: Change parallelism number in Spark Streaming
Hi, No idea. I've just begun exploring the current state of state management in spark structured streaming. I'd not be surprised if what you're after were not possible. Stateful stream processing in SSS is fairly young. Jacek On Wed, 26 Jun 2019, 21:48 Rong, Jialei, wrote: > Thank you for your quick reply! > > Is there any plan to improve this? > > I asked this question due to some investigation on comparing those state > of art streaming systems, among which Flink and DataFlow allow changing > parallelism number, and by my knowledge of Spark Streaming, it seems it is > also able to do that: if some “key interval” concept is used, then state > can somehow decoupled from partition number by consistent hashing. > > > > > > Regards > > Jialei > > > > *From: *Jacek Laskowski > *Date: *Wednesday, June 26, 2019 at 11:00 AM > *To: *"Rong, Jialei" > *Cc: *"user @spark" > *Subject: *Re: Change parallelism number in Spark Streaming > > > > Hi, > > > > It's not allowed to change the numer of partitions after your streaming > query is started. > > > > The reason is exactly the number of state stores which is exactly the > number of partitions (perhaps multiplied by the number of stateful > operators). > > > > I think you'll even get a warning or an exception when you change it after > restarting the query. > > > > The number of partitions is stored in a checkpoint location. > > > > Jacek > > > > On Wed, 26 Jun 2019, 19:30 Rong, Jialei, > wrote: > > Hi Dear Spark Expert > > > > I’m curious about a question regarding Spark Streaming/Structured > Streaming: whether it allows to change parallelism number(the default one > or the one specified in particular operator) in a stream having stateful > transform/operator? Whether this will cause my checkpointed state get > messed up? > > > > > > Regards > > Jialei > > > >
Re: Change parallelism number in Spark Streaming
Hi, It's not allowed to change the numer of partitions after your streaming query is started. The reason is exactly the number of state stores which is exactly the number of partitions (perhaps multiplied by the number of stateful operators). I think you'll even get a warning or an exception when you change it after restarting the query. The number of partitions is stored in a checkpoint location. Jacek On Wed, 26 Jun 2019, 19:30 Rong, Jialei, wrote: > Hi Dear Spark Expert > > > > I’m curious about a question regarding Spark Streaming/Structured > Streaming: whether it allows to change parallelism number(the default one > or the one specified in particular operator) in a stream having stateful > transform/operator? Whether this will cause my checkpointed state get > messed up? > > > > > > Regards > > Jialei > > >
Re: Spark 2.2 With Column usage
Hi, Why are you doing the following two lines? .select("id",lit(referenceFiltered)) .selectexpr( "id" ) What are you trying to achieve? What's lit and what's referenceFiltered? What's the difference between select and selectexpr? Please start at http://spark.apache.org/docs/latest/sql-programming-guide.html and then hop onto http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.package to know the Spark API better. I'm sure you'll quickly find out the answer(s). Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski The Internals of Spark SQL https://bit.ly/spark-sql-internals The Internals of Spark Structured Streaming https://bit.ly/spark-structured-streaming The Internals of Apache Kafka https://bit.ly/apache-kafka-internals Follow me at https://twitter.com/jaceklaskowski On Sat, Jun 8, 2019 at 12:53 PM anbutech wrote: > Thanks Jacek Laskowski Sir.but i didn't get the point here > > please advise the below one are you expecting: > > dataset1.as("t1) > > join(dataset3.as("t2"), > > col(t1.col1) === col(t2.col1), JOINTYPE.Inner ) > > .join(dataset4.as("t3"), col(t3.col1) === col(t1.col1), > > JOINTYPE.Inner) > .select("id",lit(referenceFiltered)) > .selectexpr( > "id" > ) > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: Spark logging questions
Hi, What are "the spark driver and executor threads information" and "spark application logging"? Spark uses log4j so set up logging levels appropriately and you should be done. Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski The Internals of Spark SQL https://bit.ly/spark-sql-internals The Internals of Spark Structured Streaming https://bit.ly/spark-structured-streaming The Internals of Apache Kafka https://bit.ly/apache-kafka-internals Follow me at https://twitter.com/jaceklaskowski On Fri, Jun 7, 2019 at 1:13 PM test test wrote: > Hello, > > How can we dump the spark driver and executor threads information in spark > application logging.? > > > PS: submitting spark job using spark submit > > Regards > Rohit >
Re: Spark 2.2 With Column usage
Hi, > val referenceFiltered = dataset2.filter(.dataDate == date).filter.someColumn).select("id").toString > .withColumn("new_column",lit(referenceFiltered)) That won't work since lit is a function (adapter) to convert Scala values to Catalyst expressions. Unless I'm mistaken, in your case, what you really need is to replace `withColumn` with `select("id")` itself and you're done. When I'm writing this (I'm saying exactly what you actually have already) and I'm feeling confused. Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski The Internals of Spark SQL https://bit.ly/spark-sql-internals The Internals of Spark Structured Streaming https://bit.ly/spark-structured-streaming The Internals of Apache Kafka https://bit.ly/apache-kafka-internals Follow me at https://twitter.com/jaceklaskowski On Sat, Jun 8, 2019 at 6:05 AM anbutech wrote: > Hi Sir, > > Could you please advise to fix the below issue in the withColumn in the > spark 2.2 scala 2.11 joins > > def processing(spark:SparkSession, > > dataset1:Dataset[Reference], > > dataset2:Dataset[DataCore], > > dataset3:Dataset[ThirdPartyData] , > > dataset4:Dataset[OtherData] > > date:String):Dataset[DataMerge] { > > val referenceFiltered = dataset2.filter(.dataDate == > date).filter.someColumn).select("id").toString > > dataset1.as("t1) > > join(dataset3.as("t2"), > > col(t1.col1) === col(t2.col1), JOINTYPE.Inner ) > > .join(dataset4.as("t3"), col(t3.col1) === col(t1.col1), > > JOINTYPE.Inner) > > .withColumn("new_column",lit(referenceFiltered)) > > .selectexpr( > > "id", ---> want to get this value > > "column1, > > "column2, > > "column3", > > "column4" ) > > } > > how do i get the String value ,let say the value"124567" > ("referenceFiltered") inside the withColumn? > > im getting the withColumn output as "id:BigInt" . I want to get the same > value for all the records. > > Note: > > I have asked not use cross join in the code. Any other way to fix this > issue. > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
[SQL] Why casting string column to timestamp gives null?
Hi, Why is casting a string column to timestamp not giving the same results as going through casting to long in-between? I'm tempted to consider it a bug. scala> spark.version res4: String = 2.4.3 scala> Seq("1", "2").toDF("ts").select($"ts" cast "timestamp").show ++ | ts| ++ |null| |null| ++ scala> Seq("1", "2").toDF("ts").select($"ts" cast "long").select($"ts" cast "timestamp").show +---+ | ts| +---+ |1970-01-01 01:00:01| |1970-01-01 01:00:02| +---+ Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski The Internals of Spark SQL https://bit.ly/spark-sql-internals The Internals of Spark Structured Streaming https://bit.ly/spark-structured-streaming The Internals of Apache Kafka https://bit.ly/apache-kafka-internals Follow me at https://twitter.com/jaceklaskowski
Re: What is the difference for the following UDFs?
Hi, For this particular case I'd use Column.substr ( http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Column), e.g. val ns = Seq(("hello world", 1, 5)).toDF("w", "b", "e") scala> ns.select($"w".substr($"b", $"e" - $"b" + 1) as "demo").show +-+ | demo| +-+ |hello| +-+ Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Mastering Spark SQL https://bit.ly/mastering-spark-sql Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Kafka Streams https://bit.ly/mastering-kafka-streams Follow me at https://twitter.com/jaceklaskowski On Tue, May 14, 2019 at 5:08 PM Qian He wrote: > For example, I have a dataframe with 3 columns: URL, START, END. For each > url from URL column, I want to fetch a substring of it starting from START > and ending at END. > ++--+-+ > |URL|START |END | > ++--+-+ > |www.amazon.com |4 |14 | > |www.yahoo.com |4 |13 | > |www.amazon.com |4 |14 | > |www.google.com|4 |14 | > > I have UDF1: > > def getSubString = (input: String, start: Int, end: Int) => { >input.substring(start, end) > } > val udf1 = udf(getSubString) > > and another UDF2: > > def getColSubString()(c1: Column, c2: Column, c3: Column): Column = { >c1.substr(c2, c3-c2) > } > > Let's assume they can both generate the result I want. But, from performance > perspective, is there any difference between those two UDFs? > > >
Re: Spark SQL met "Block broadcast_xxx not found"
Hi, I'm curious about "I found the bug code". Can you point me at it? Thanks. Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Mastering Spark SQL https://bit.ly/mastering-spark-sql Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Kafka Streams https://bit.ly/mastering-kafka-streams Follow me at https://twitter.com/jaceklaskowski On Tue, May 7, 2019 at 9:34 AM Xilang Yan wrote: > Ok... I am sure it is a bug of spark, I found the bug code, but the code is > removed in 2.2.3, so I just upgrade spark to fix the problem. > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: How to use same SparkSession in another app?
Hi, Not possible. What are you really trying to do? Why do you need to share dataframes? They're nothing but metadata of a distributed computation (no data inside) so what would be the purpose of such sharing? Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Mastering Spark SQL https://bit.ly/mastering-spark-sql Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Kafka Streams https://bit.ly/mastering-kafka-streams Follow me at https://twitter.com/jaceklaskowski On Tue, Apr 16, 2019 at 1:57 PM Rishikesh Gawade wrote: > Hi. > I wish to use a SparkSession created by one app in another app so that i > can use the dataframes belonging to that session. Is it possible to use the > same sparkSession in another app? > Thanks, > Rishikesh >
Re: Observing DAGScheduler Log Messages
Hi, Add the following line to conf/log4j.properties and you should have all the logs: log4j.logger.org.apache.spark.scheduler.DAGScheduler=ALL Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Mastering Spark SQL https://bit.ly/mastering-spark-sql Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Kafka Streams https://bit.ly/mastering-kafka-streams Follow me at https://twitter.com/jaceklaskowski On Sun, Apr 7, 2019 at 6:05 PM M Bilal wrote: > Hi, > > I want to observe the log messages from DAGScheduler in Apache Spark. > Which log files do I need to check. > I have tried observing the driver logs and worker stderr logs but I can't > find any messages that are from that class. > > I am using Spark 3.0.0 snapshot in standalone mode. > > Thanks. > > Regards, > Bilal >
Re: Spark 2.4 partitions and tasks
Hi, Can you show the plans with explain(extended=true) for both versions? That's where I'd start to pinpoint the issue. Perhaps the underlying execution engine change to affect keyBy? Dunno and guessing... Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Mastering Spark SQL https://bit.ly/mastering-spark-sql Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Kafka Streams https://bit.ly/mastering-kafka-streams Follow me at https://twitter.com/jaceklaskowski On Fri, Feb 8, 2019 at 5:09 PM Pedro Tuero wrote: > I did a repartition to 1 (hardcoded) before the keyBy and it ends in > 1.2 minutes. > The questions remain open, because I don't want to harcode paralellism. > > El vie., 8 de feb. de 2019 a la(s) 12:50, Pedro Tuero ( > tuerope...@gmail.com) escribió: > >> 128 is the default parallelism defined for the cluster. >> The question now is why keyBy operation is using default parallelism >> instead of the number of partition of the RDD created by the previous step >> (5580). >> Any clues? >> >> El jue., 7 de feb. de 2019 a la(s) 15:30, Pedro Tuero ( >> tuerope...@gmail.com) escribió: >> >>> Hi, >>> I am running a job in spark (using aws emr) and some stages are taking a >>> lot more using spark 2.4 instead of Spark 2.3.1: >>> >>> Spark 2.4: >>> [image: image.png] >>> >>> Spark 2.3.1: >>> [image: image.png] >>> >>> With Spark 2.4, the keyBy operation take more than 10X what it took with >>> Spark 2.3.1 >>> It seems to be related to the number of tasks / partitions. >>> >>> Questions: >>> - Is it not supposed that the number of task of a job is related to >>> number of parts of the RDD left by the previous job? Did that change in >>> version 2.4?? >>> - Which tools/ configuration may I try, to reduce this aberrant >>> downgrade of performance?? >>> >>> Thanks. >>> Pedro. >>> >>
Re: structured streaming handling validation and json flattening
Hi Lian, "What have you tried?" would be a good starting point. Any help on this? How do you read the JSONs? readStream.json? You could use readStream.text followed by filter to include/exclude good/bad JSONs. Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Mastering Spark SQL https://bit.ly/mastering-spark-sql Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Kafka Streams https://bit.ly/mastering-kafka-streams Follow me at https://twitter.com/jaceklaskowski On Sat, Feb 9, 2019 at 8:25 PM Lian Jiang wrote: > Hi, > > We have a structured streaming job that converting json into parquets. We > want to validate the json records. If a json record is not valid, we want > to log a message and refuse to write it into the parquet. Also the json has > nesting jsons and we want to flatten the nesting jsons into other parquets > by using the same streaming job. My questions are: > > 1. how to validate the json records in a structured streaming job? > 2. how to flattening the nesting jsons in a structured streaming job? > 3. is it possible to use one structured streaming job to validate json, > convert json into a parquet and convert nesting jsons into other parquets? > > I think unstructured streaming can achieve these goals but structured > streaming is recommended by spark community. > > Appreciate your feedback! >
Re: Where is the DAG stored before catalyst gets it?
Hi Jean Georges, > I am assuming it is still in the master and when catalyst is finished it sends the tasks to the workers. Sorry to be that direct, but the sentence does not make much sense to me. Again, very sorry for saying it in the very first sentence. Since I know Jean Georges I allowed myself for more openness. In other words, "the master" part seems to suggest that you use Spark Standalone cluster. Correct? Other cluster use different naming for the master/manager node. "when catalyst is finished" that one is really tough to understand. You mean once all the optimizations are applied and the query is ready for execution? The final output of the "query execution pipeline" is to generate a RDD with the right code for execution. At this phase, the query is more an RDD than a Dataset. "it sends the tasks to the workers." since we're talking about an RDD, this abstraction is planned as a set of tasks (one per partition of the RDD). And yes, the tasks are sent out over the wire to executors. It's been like this from Spark 1.0 (and even earlier). Hope I helped a bit. Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Mastering Spark SQL https://bit.ly/mastering-spark-sql Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Kafka Streams https://bit.ly/mastering-kafka-streams Follow me at https://twitter.com/jaceklaskowski On Fri, Oct 5, 2018 at 12:36 AM Jean Georges Perrin wrote: > Hi, > > I am assuming it is still in the master and when catalyst is finished it > sends the tasks to the workers. > > Correct? > > tia > > jg > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: Spark code to write to MySQL and Hive
Hi, I haven't checked my answer (too lazy today), but think I know what might be going on. tl;dr Use cache to preserve the initial set of rows from mysql After you append new rows, you will have twice as many rows as you had previously. Correct? Since newDF references the table every time you use it in a structured query, say to write it to a table, the source table will get re-loaded and hence the number of rows changes. What you should do is to execute newDF.cache.count right after val newDF = mysqlDF.select... so the data (rows) remains on executors and won't get reloaded. Hope that helps. Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Mastering Spark SQL https://bit.ly/mastering-spark-sql Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Kafka Streams https://bit.ly/mastering-kafka-streams Follow me at https://twitter.com/jaceklaskowski On Wed, Aug 29, 2018 at 4:59 PM wrote: > Sorry, last mail format was not good. > > > > *println*(*"Going to talk to mySql"*) > > > *// Read table from mySQL.**val *mysqlDF = spark.read.jdbc(jdbcUrl, > table, properties) > *println*(*"I am back from mySql"*) > > mysqlDF.show() > > > *// Create a new Dataframe with column 'id' increased to avoid Duplicate > primary keys**val *newDF = mysqlDF.select((*col*(*"id"*) + 10).as(*"id"*), > *col*(*"country"*), *col*(*"city"*)) > newDF.printSchema() > newDF.show() > > > *// Insert records into the table.*newDF.write > .mode(SaveMode.*Append*) > .jdbc(jdbcUrl, table, properties) > > > *// Write to Hive - This Creates a new table.*newDF.write.saveAsTable( > *"cities"*) > newDF.show() > > > > > > > > Going to talk to mySql > > I am back from mySql > > +---+--+-+ > > | id| country| city| > > +---+--+-+ > > | 1| USA|Palo Alto| > > | 2|Czech Republic| Brno| > > | 3| USA|Sunnyvale| > > | 4| null| null| > > +---+--+-+ > > > > root > > |-- id: long (nullable = false) > > |-- country: string (nullable = true) > > |-- city: string (nullable = true) > > > > +---+--+-+ > > | id| country| city| > > +---+--+-+ > > | 11| USA|Palo Alto| > > | 12|Czech Republic| Brno| > > | 13| USA|Sunnyvale| > > | 14| null| null| > > +---+--+-+ > > > > +---+--+-+ > > | id| country| city| > > +---+--+-+ > > | 11| USA|Palo Alto| > > | 12|Czech Republic| Brno| > > | 13| USA|Sunnyvale| > > | 14| null| null| > > | 24| null| null| > > | 23| USA|Sunnyvale| > > | 22|Czech Republic| Brno| > > | 21| USA|Palo Alto| > > +---+--+-+ > > > > Thanks, > > Ravi > > > > *From:* ryanda...@gmail.com > *Sent:* Wednesday, August 29, 2018 8:19 PM > *To:* user@spark.apache.org > *Subject:* Spark code to write to MySQL and Hive > > > > Hi, > > > > Can anyone help me to understand what is happening with my code ? > > > > I wrote a Spark application to read from a MySQL table [that already has 4 > records], Create a new DF by adding 10 to the ID field. Then, I wanted to > write the new DF to MySQL as well as to Hive. > > > > I am surprised to see additional set of records in Hive !! I am not able > to understand how the *newDF *has records with IDs 21 to 24. I know that > a DF is immutable. If so, how come it has 4 records at one point and 8 > records at later point ? > > > > > *// Read table from mySQL.**val *mysqlDF = spark.read.jdbc(jdbcUrl, > table, properties) > *println*(*"I am back from mySql"*) > > > > > > > > > > mysqlDF.show() > > > > > > > > > > > > > *// Create a new Dataframe with column 'id' increased to avoid Duplicate > primary keys**val *newDF = mysqlDF.select((*col*(*"id"*) + 10).as(*"id"*), > *col*(*"country"*), *col*(*"city"*)) > newDF.printSchema() > newDF.show() > > > > > > > *// Insert records into the MySQL table.*newDF.write > .mode(SaveMode.*Append*) > .jdbc(jdbcUrl, table, properties) > > > > > *// Write to Hive - This Creates a new table.*newDF.write.saveAsTable( > *"cities"*) > newDF.show() > > > > >
Re: Bug in Window Function
Hi Elior, Could you show the query that led to the exception? Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Mastering Spark SQL https://bit.ly/mastering-spark-sql Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Kafka Streams https://bit.ly/mastering-kafka-streams Follow me at https://twitter.com/jaceklaskowski On Wed, Jul 25, 2018 at 10:04 AM, Elior Malul wrote: > Exception in thread "main" org.apache.spark.sql.AnalysisException: > collect_set(named_struct(value, country#123 AS value#346, count, > (cast(count(country#123) windowspecdefinit ion(campaign_id#104, > app_id#93, country#123, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED > FOLLOWING) as double) / cast(count(1) windowspecdefinition(campaign_id#104, > app_id #93, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) > as double)) AS count#349) AS histogram_country#350, 0, 0) > windowspecdefinition(campaign_id#104, app_id#93, ROWS BETWEEN > UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS > collect_set(named_struct(NamePlaceholder(), > country AS `value`, NamePlaceholder(), (CAST(count(country) OVER (PARTITI > ON BY campaign_id, app_id, country UnspecifiedFrame) AS DOUBLE) / > CAST(count(1) OVER (PARTITION BY campaign_id, app_id UnspecifiedFrame) AS > DOUBLE)) AS `count`) AS `histogram _country`) OVER (PARTITION BY > campaign_id, app_id UnspecifiedFrame)#352 has multiple Window > Specifications (ArrayBuffer(windowspecdefinition(campaign_id#104, > app_id#93, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), > windowspecdefinition(campaign_id#104, app_id#93, country#123, ROWS > BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)) ).Please file a > bug report with this error message, stack trace, and the query.; >
Re: Spark 2.4 release date
Hi, What about https://issues.apache.org/jira/projects/SPARK/versions/12342385? Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Mastering Spark SQL https://bit.ly/mastering-spark-sql Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Kafka Streams https://bit.ly/mastering-kafka-streams Follow me at https://twitter.com/jaceklaskowski On Mon, Jun 18, 2018 at 9:41 PM, Li Gao wrote: > Hello, > > Do we know the estimate when Spark 2.4 will be GA? > We are evaluating whether to back port some of 2.4 fixes into our 2.3 > deployment. > > Thank you. >
Re: Spark Structured Streaming is giving error “org.apache.spark.sql.AnalysisException: Inner join between two streaming DataFrames/Datasets is not supported;”
Hi, After you leave Spark Structured Streaming right after you generate RDDs (for your streaming queries) you can do any kind of "joins". You're again in the old good days of RDD programming (with all the whistles and bells). Please note that Spark Structured Streaming != Spark Streaming since the former uses Dataset API while the latter RDD API. Don't touch RDD API and Spark Streaming unless you know what you're doing :) Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Mastering Spark SQL https://bit.ly/mastering-spark-sql Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Kafka Streams https://bit.ly/mastering-kafka-streams Follow me at https://twitter.com/jaceklaskowski On Tue, May 15, 2018 at 5:36 PM, ☼ R Nair (रविशंकर नायर) < ravishankar.n...@gmail.com> wrote: > Hi Jacek, > > If we use RDD instead of Dataframe, can we accomplish the same? I mean, is > joining between RDDS allowed in Spark streaming ? > > Best, > Ravi > > On Sun, May 13, 2018 at 11:18 AM Jacek Laskowski <ja...@japila.pl> wrote: > >> Hi, >> >> The exception message should be self-explanatory and says that you cannot >> join two streaming Datasets. This feature was added in 2.3 if I'm not >> mistaken. >> >> Just to be sure that you work with two streaming Datasets, can you show >> the query plan of the join query? >> >> Jacek >> >> On Sat, 12 May 2018, 16:57 ThomasThomas, <thomaspt...@gmail.com> wrote: >> >>> Hi There, >>> >>> Our use case is like this. >>> >>> We have a nested(multiple) JSON message flowing through Kafka Queue. >>> Read >>> the message from Kafka using Spark Structured Streaming(SSS) and explode >>> the data and flatten all data into single record using DataFrame joins >>> and >>> land into a relational database table(DB2). >>> >>> But we are getting the following error when we write into db using JDBC. >>> >>> “org.apache.spark.sql.AnalysisException: Inner join between two >>> streaming >>> DataFrames/Datasets is not supported;” >>> >>> Any help would be greatly appreciated. >>> >>> Thanks, >>> Thomas Thomas >>> Mastermind Solutions LLC. >>> >>> >>> >>> -- >>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ >>> >>> - >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>> >>>
Re: help with streaming batch interval question needed
Hi Peter, > Basically I need to find a way to set the batch-interval in (b), similar as in (a) below. That's trigger method on DataStreamWriter. http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.DataStreamWriter import org.apache.spark.sql.streaming.Trigger df.writeStream.trigger(Trigger.ProcessingTime("1 second")) See http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Mastering Spark SQL https://bit.ly/mastering-spark-sql Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Kafka Streams https://bit.ly/mastering-kafka-streams Follow me at https://twitter.com/jaceklaskowski On Thu, May 24, 2018 at 10:14 PM, Peter Liu <peter.p...@gmail.com> wrote: > Hi there, > > from my apache spark streaming website (see links below), > >- the batch-interval is set when a spark StreamingContext is >constructed (see example (a) quoted below) >- the StreamingContext is available in older and new Spark version >(v1.6, v2.2 to v2.3.0) (see https://spark.apache.org/docs/ >1.6.0/streaming-programming-guide.html ><https://spark.apache.org/docs/1.6.0/streaming-programming-guide.html> >and https://spark.apache.org/docs/2.3.0/streaming-programming- >guide.html ) >- however, example (b) below doesn't use StreamingContext, but >StreamingSession object to setup a streaming flow; > > What does the usage difference in (a) and (b) mean? I was wondering if > this would mean a different streaming approach ("traditional" streaming vs > structured streaming? > > Basically I need to find a way to set the batch-interval in (b), similar > as in (a) below. > > Would be great if someone can please share some insights here. > > Thanks! > > Peter > > (a) > https://spark.apache.org/docs/2.3.0/streaming-programming-guide.html ) > > import org.apache.spark._import org.apache.spark.streaming._ > val conf = new SparkConf().setAppName(appName).setMaster(master)val *ssc *= > new StreamingContext(conf, Seconds(1)) > > > (b) > ( from databricks' https://databricks.com/blog/ > 2017/04/26/processing-data-in-apache-kafka-with-structured- > streaming-in-apache-spark-2-2.html) > >val *spark *= SparkSession.builder() > .appName(appName) > .getOrCreate() > ... > > jsonOptions = { "timestampFormat": nestTimestampFormat } > parsed = *spark *\ > .readStream \ > .format("kafka") \ > .option("kafka.bootstrap.servers", "localhost:9092") \ > .option("subscribe", "nest-logs") \ > .load() \ > .select(from_json(col("value").cast("string"), schema, > jsonOptions).alias("parsed_value")) > > > > >
Re: Spark Structured Streaming is giving error “org.apache.spark.sql.AnalysisException: Inner join between two streaming DataFrames/Datasets is not supported;”
Hi, The exception message should be self-explanatory and says that you cannot join two streaming Datasets. This feature was added in 2.3 if I'm not mistaken. Just to be sure that you work with two streaming Datasets, can you show the query plan of the join query? Jacek On Sat, 12 May 2018, 16:57 ThomasThomas,wrote: > Hi There, > > Our use case is like this. > > We have a nested(multiple) JSON message flowing through Kafka Queue. Read > the message from Kafka using Spark Structured Streaming(SSS) and explode > the data and flatten all data into single record using DataFrame joins and > land into a relational database table(DB2). > > But we are getting the following error when we write into db using JDBC. > > “org.apache.spark.sql.AnalysisException: Inner join between two streaming > DataFrames/Datasets is not supported;” > > Any help would be greatly appreciated. > > Thanks, > Thomas Thomas > Mastermind Solutions LLC. > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Curious case of Spark SQL 2.3 - number of stages different for the same query ever?
Hi, I've got a case where the same structured query (it's union) gives 1 stage for a run and 5 stages for another. I could not find any pattern yet (and it's hard to reproduce it due to the volume and the application), but I'm pretty certain that it's *never* possible that Spark 2.3 could come up with 1 vs 5 stages for the very same query plan (even if I changed number of executors or number of cores or anything execution-related). So my question is, is this possible that Spark SQL could give 1-stage execution plan and 5-stage execution plan for the very same query? (I am not saying that I'm 100% sure that the query is indeed the same since I'm working on a reproducible test case and only when I got it I'll really be). Sorry for the vague description, but I've got nothing more to share yet. Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Mastering Spark SQL https://bit.ly/mastering-spark-sql Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Kafka Streams https://bit.ly/mastering-kafka-streams Follow me at https://twitter.com/jaceklaskowski
Re: Run spark 2.2 on yarn as usual java application
Hi, What's the deployment process then (if not using spark-submit)? How is the AM deployed? Why would you want to skip spark-submit? Jacek On 19 Mar 2018 00:20, "Serega Sheypak"wrote: > Hi, Is it even possible to run spark on yarn as usual java application? > I've built jat using maven with spark-yarn dependency and I manually > populate SparkConf with all hadoop properties. > SparkContext fails to start with exception: > >1. Caused by: java.lang.IllegalStateException: Library directory >'/hadoop/yarn/local/usercache/root/appcache/application_ >1521375636129_0022/container_e06_1521375636129_0022_01_ >02/assembly/target/scala-2.11/jars' does not exist; make sure Spark >is built. >2. at org.apache.spark.launcher.CommandBuilderUtils.checkState(Com >mandBuilderUtils.java:260) >3. at org.apache.spark.launcher.CommandBuilderUtils.findJarsDir(Co >mmandBuilderUtils.java:359) >4. at org.apache.spark.launcher.YarnCommandBuilderUtils$.findJarsDir( >YarnCommandBuilderUtils.scala:38) > > > I took a look at the code and it has some hardcodes and checks for > specific files layout. I don't follow why :) > Is it possible to bypass such checks? >
Re: NPE in Subexpression Elimination optimization
Hi, Filled https://issues.apache.org/jira/browse/SPARK-23731 and am working on a workaround (aka fix). Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Mastering Spark SQL https://bit.ly/mastering-spark-sql Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Kafka Streams https://bit.ly/mastering-kafka-streams Follow me at https://twitter.com/jaceklaskowski On Fri, Mar 16, 2018 at 3:56 PM, Jacek Laskowski <ja...@japila.pl> wrote: > Hi, > > I'm working on a minimal test to reproduce the NPE exception that is > thrown in the latest 2.3.0 and earlier 2.2.1 in subexpression elimination > optimization, and am sending it to the mailing list hoping someone notices > something familiar and would shed more light on what might be the root > cause and how to write a test. > > I know why Spark throws the NPE technically since there's this @transient > relation: HadoopFsRelation [1] that is not re-created at de-serialization > on executors, but don't know why this @transient is required in the first > place and more importantly how to write a test. > > Any hints appreciated. > > FYI Disabling subexpression elimination with spark.sql. > subexpressionElimination.enabled Spark configuration property helps. > > [1] https://github.com/apache/spark/blob/branch-2.3/sql/ > core/src/main/scala/org/apache/spark/sql/execution/ > DataSourceScanExec.scala?utf8=%E2%9C%93#L159 > > Caused by: java.lang.NullPointerException > at org.apache.spark.sql.execution.FileSourceScanExec.< > init>(DataSourceScanExec.scala:167) > at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize( > DataSourceScanExec.scala:502) > at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize( > DataSourceScanExec.scala:158) > at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$ > lzycompute(QueryPlan.scala:210) > at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized( > QueryPlan.scala:209) > at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2. > apply(QueryPlan.scala:224) > at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2. > apply(QueryPlan.scala:224) > at scala.collection.TraversableLike$$anonfun$map$ > 1.apply(TraversableLike.scala:234) > at scala.collection.TraversableLike$$anonfun$map$ > 1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.List.map(List.scala:285) > at org.apache.spark.sql.catalyst.plans.QueryPlan. > doCanonicalize(QueryPlan.scala:224) > at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$ > lzycompute(QueryPlan.scala:210) > at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized( > QueryPlan.scala:209) > at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2. > apply(QueryPlan.scala:224) > at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2. > apply(QueryPlan.scala:224) > at scala.collection.TraversableLike$$anonfun$map$ > 1.apply(TraversableLike.scala:234) > at scala.collection.TraversableLike$$anonfun$map$ > 1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.List.map(List.scala:285) > at org.apache.spark.sql.catalyst.plans.QueryPlan. > doCanonicalize(QueryPlan.scala:224) > at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$ > lzycompute(QueryPlan.scala:210) > at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized( > QueryPlan.scala:209) > at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2. > apply(QueryPlan.scala:224) > at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2. > apply(QueryPlan.scala:224) > at scala.collection.TraversableLike$$anonfun$map$ > 1.apply(TraversableLike.scala:234) > at scala.collection.TraversableLike$$anonfun$map$ > 1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.List.map(List.scala:285) > at org.apache.spark.sql.catalyst.plans.QueryPlan. > doCanonicalize(QueryPlan.scala:224) > at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$ > lzycompute(QueryPlan.scala:210) > at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized( > QueryPlan.scala:209) > at org.apache.spark.sql.catalyst.plans.QueryPlan.sameResult( > QueryPlan.scala:257) > at org.apache.spark.sql.execution.ScalarSubquery. > semanticEquals(subquery.sc
NPE in Subexpression Elimination optimization
) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Mastering Spark SQL https://bit.ly/mastering-spark-sql Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Kafka Streams https://bit.ly/mastering-kafka-streams Follow me at https://twitter.com/jaceklaskowski
Re: Are there any alternatives to Hive "stored by" clause as Spark 2.0 does not support it
Hi, Since I'm new to Hive, what does `stored by` do? I might help a bit in Spark if I only knew a bit about Hive :) Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Mastering Spark SQL https://bit.ly/mastering-spark-sql Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Kafka Streams https://bit.ly/mastering-kafka-streams Follow me at https://twitter.com/jaceklaskowski On Thu, Feb 8, 2018 at 7:25 AM, Pralabh Kumar <pralabhku...@gmail.com> wrote: > Hi > > Spark 2.0 doesn't support stored by . Is there any alternative to achieve > the same. > > >
Re: Apache Spark - Spark Structured Streaming - Watermark usage
Hi, What would you expect? The data is simply dropped as that's the purpose of watermarking it. That's my understanding at least. Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Mastering Spark SQL https://bit.ly/mastering-spark-sql Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Kafka Streams https://bit.ly/mastering-kafka-streams Follow me at https://twitter.com/jaceklaskowski On Mon, Feb 5, 2018 at 8:11 PM, M Singh <mans2si...@yahoo.com> wrote: > Just checking if anyone has more details on how watermark works in cases > where event time is earlier than processing time stamp. > > > On Friday, February 2, 2018 8:47 AM, M Singh <mans2si...@yahoo.com> wrote: > > > Hi Vishu/Jacek: > > Thanks for your responses. > > Jacek - At the moment, the current time for my use case is processing time. > > Vishnu - Spark documentation (https://spark.apache.org/ > docs/latest/structured-streaming-programming-guide.html) does indicate > that it can dedup using watermark. So I believe there are more use cases > for watermark and that is what I am trying to find. > > I am hoping that TD can clarify or point me to the documentation. > > Thanks > > > On Wednesday, January 31, 2018 6:37 AM, Vishnu Viswanath < > vishnu.viswanat...@gmail.com> wrote: > > > Hi Mans, > > Watermark is Spark is used to decide when to clear the state, so if the > even it delayed more than when the state is cleared by Spark, then it will > be ignored. > I recently wrote a blog post on this : http://vishnuviswanath.com/ > spark_structured_streaming.html#watermark > > Yes, this State is applicable for aggregation only. If you are having only > a map function and don't want to process it, you could do a filter based on > its EventTime field, but I guess you will have to compare it with the > processing time since there is no API to access Watermark by the user. > > -Vishnu > > On Fri, Jan 26, 2018 at 1:14 PM, M Singh <mans2si...@yahoo.com.invalid> > wrote: > > Hi: > > I am trying to filter out records which are lagging behind (based on event > time) by a certain amount of time. > > Is the watermark api applicable to this scenario (ie, filtering lagging > records) or it is only applicable with aggregation ? I could not get a > clear understanding from the documentation which only refers to it's usage > with aggregation. > > Thanks > > Mans > > > > > > >
Re: spark job error
Hi, Start with spark.executor.memory 2g. You may also give spark.yarn.executor.memoryOverhead a try. See https://spark.apache.org/docs/latest/configuration.html and https://spark.apache.org/docs/latest/running-on-yarn.html for more in-depth information. Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Mastering Spark SQL https://bit.ly/mastering-spark-sql Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Kafka Streams https://bit.ly/mastering-kafka-streams Follow me at https://twitter.com/jaceklaskowski On Tue, Jan 30, 2018 at 5:52 PM, shyla deshpande <deshpandesh...@gmail.com> wrote: > I am running Zeppelin on EMR. with the default settings. I am getting the > following error. Restarting the Zeppelin application fixes the problem. > > What default settings do I need to override that will help fix this error. > > org.apache.spark.SparkException: Job aborted due to stage failure: Task > 71 in stage 231.0 failed 4 times, most recent failure: Lost task 71.3 in > stage 231.0 Reason: Container killed by YARN for exceeding memory limits. > 1.4 GB of 1.4 GB physical memory used. Consider boosting > spark.yarn.executor.memoryOverhead. > > Thanks > >
Re: Apache Spark - Spark Structured Streaming - Watermark usage
Hi, I'm curious how would you do the requirement "by a certain amount of time" without a watermark? How would you know what's current and compute the lag? Let's forget about watermark for a moment and see if it pops up as an inevitable feature :) "I am trying to filter out records which are lagging behind (based on event time) by a certain amount of time." Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Mastering Spark SQL https://bit.ly/mastering-spark-sql Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Kafka Streams https://bit.ly/mastering-kafka-streams Follow me at https://twitter.com/jaceklaskowski On Fri, Jan 26, 2018 at 7:14 PM, M Singh <mans2si...@yahoo.com.invalid> wrote: > Hi: > > I am trying to filter out records which are lagging behind (based on event > time) by a certain amount of time. > > Is the watermark api applicable to this scenario (ie, filtering lagging > records) or it is only applicable with aggregation ? I could not get a > clear understanding from the documentation which only refers to it's usage > with aggregation. > > Thanks > > Mans >
Re: Best active groups, forums or contacts for Spark ?
Hi Esa, I'd say https://stackoverflow.com/questions/tagged/apache-spark is where many active sparkians hang out :) Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Mastering Spark SQL https://bit.ly/mastering-spark-sql Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Kafka Streams https://bit.ly/mastering-kafka-streams Follow me at https://twitter.com/jaceklaskowski On Fri, Jan 26, 2018 at 12:15 PM, Esa Heikkinen < esa.heikki...@student.tut.fi> wrote: > Hi > > > > It is very often difficult to get answers of question about Spark in many > forums.. Maybe they are inactive or my questions are too bad. I don’t know, > but does anyone know good active groups, forums or contacts other like this > ? > > > > Esa Heikkinen > > >
Re: Inner join with the table itself
Hi Michael, scala> spark.version res0: String = 2.4.0-SNAPSHOT scala> val r1 = spark.range(1) r1: org.apache.spark.sql.Dataset[Long] = [id: bigint] scala> r1.as("left").join(r1.as("right")).filter($"left.id" === $"right.id ").show +---+---+ | id| id| +---+---+ | 0| 0| +---+---+ Am I missing something? When aliasing a table, use the identifier in column refs (inside). Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Mastering Spark SQL https://bit.ly/mastering-spark-sql Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Kafka Streams https://bit.ly/mastering-kafka-streams Follow me at https://twitter.com/jaceklaskowski On Mon, Jan 15, 2018 at 3:26 PM, Michael Shtelma <mshte...@gmail.com> wrote: > Hi Jacek & Gengliang, > > let's take a look at the following query: > > val pos = spark.read.parquet(prefix + "POSITION.parquet") > pos.createOrReplaceTempView("POSITION") > spark.sql("SELECT POSITION.POSITION_ID FROM POSITION POSITION JOIN > POSITION POSITION1 ON POSITION.POSITION_ID0 = POSITION1.POSITION_ID > ").collect() > > This query is working for me right now using spark 2.2. > > Now we can try implementing the same logic with DataFrame API: > > pos.join(pos, pos("POSITION_ID0")===pos("POSITION_ID")).collect() > > I am getting the following error: > > "Join condition is missing or trivial. > > Use the CROSS JOIN syntax to allow cartesian products between these > relations.;" > > I have tried using alias function, but without success: > > val pos2 = pos.alias("P2") > pos.join(pos2, pos("POSITION_ID0")===pos2("POSITION_ID")).collect() > > This also leads us to the same error. > Am I missing smth about the usage of alias? > > Now let's rename the columns: > > val pos3 = pos.toDF(pos.columns.map(_ + "_2"): _*) > pos.join(pos3, pos("POSITION_ID0")===pos3("POSITION_ID_2")).collect() > > It works! > > There is one more really odd thing about all this: a colleague of mine > has managed to get the same exception ("Join condition is missing or > trivial") also using original SQL query, but I think he has been using > empty tables. > > Thanks, > Michael > > > On Mon, Jan 15, 2018 at 11:27 AM, Gengliang Wang > <gengliang.w...@databricks.com> wrote: > > Hi Michael, > > > > You can use `Explain` to see how your query is optimized. > > https://docs.databricks.com/spark/latest/spark-sql/ > language-manual/explain.html > > I believe your query is an actual cross join, which is usually very slow > in > > execution. > > > > To get rid of this, you can set `spark.sql.crossJoin.enabled` as true. > > > > > > 在 2018年1月15日,下午6:09,Jacek Laskowski <ja...@japila.pl> 写道: > > > > Hi Michael, > > > > -dev +user > > > > What's the query? How do you "fool spark"? > > > > Pozdrawiam, > > Jacek Laskowski > > > > https://about.me/JacekLaskowski > > Mastering Spark SQL https://bit.ly/mastering-spark-sql > > Spark Structured Streaming https://bit.ly/spark-structured-streaming > > Mastering Kafka Streams https://bit.ly/mastering-kafka-streams > > Follow me at https://twitter.com/jaceklaskowski > > > > On Mon, Jan 15, 2018 at 10:23 AM, Michael Shtelma <mshte...@gmail.com> > > wrote: > >> > >> Hi all, > >> > >> If I try joining the table with itself using join columns, I am > >> getting the following error: > >> "Join condition is missing or trivial. Use the CROSS JOIN syntax to > >> allow cartesian products between these relations.;" > >> > >> This is not true, and my join is not trivial and is not a real cross > >> join. I am providing join condition and expect to get maybe a couple > >> of joined rows for each row in the original table. > >> > >> There is a workaround for this, which implies renaming all the columns > >> in source data frame and only afterwards proceed with the join. This > >> allows us to fool spark. > >> > >> Now I am wondering if there is a way to get rid of this problem in a > >> better way? I do not like the idea of renaming the columns because > >> this makes it really difficult to keep track of the names in the > >> columns in result data frames. > >> Is it possible to deactivate this check? > >> > >> Thanks, > >> Michael > >> > >> - > >> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > >> > > > > >
Re: Inner join with the table itself
Hi Michael, -dev +user What's the query? How do you "fool spark"? Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Mastering Spark SQL https://bit.ly/mastering-spark-sql Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Kafka Streams https://bit.ly/mastering-kafka-streams Follow me at https://twitter.com/jaceklaskowski On Mon, Jan 15, 2018 at 10:23 AM, Michael Shtelma <mshte...@gmail.com> wrote: > Hi all, > > If I try joining the table with itself using join columns, I am > getting the following error: > "Join condition is missing or trivial. Use the CROSS JOIN syntax to > allow cartesian products between these relations.;" > > This is not true, and my join is not trivial and is not a real cross > join. I am providing join condition and expect to get maybe a couple > of joined rows for each row in the original table. > > There is a workaround for this, which implies renaming all the columns > in source data frame and only afterwards proceed with the join. This > allows us to fool spark. > > Now I am wondering if there is a way to get rid of this problem in a > better way? I do not like the idea of renaming the columns because > this makes it really difficult to keep track of the names in the > columns in result data frames. > Is it possible to deactivate this check? > > Thanks, > Michael > > - > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > >
Re: Apache Spark - Question about Structured Streaming Sink addBatch dataframe size
Hi, > If the data is very large then a collect may result in OOM. That's a general case even in any part of Spark, incl. Spark Structured Streaming. Why would you collect in addBatch? It's on the driver side and as anything on the driver, it's a single JVM (and usually not fault tolerant) > Do you have any other suggestion/recommendation ? What's wrong with the current solution? I don't think you should change how you do things currently. You should just avoid collect on large datasets (which you have to do anywhere in Spark). Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Mastering Spark SQL https://bit.ly/mastering-spark-sql Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Kafka Streams https://bit.ly/mastering-kafka-streams Follow me at https://twitter.com/jaceklaskowski On Thu, Jan 4, 2018 at 10:49 PM, M Singh <mans2si...@yahoo.com.invalid> wrote: > Thanks Tathagata for your answer. > > The reason I was asking about controlling data size is that the javadoc > indicate you can use foreach or collect on the dataframe. If the data is > very large then a collect may result in OOM. > > From your answer it appears that the only way to control the size (in 2.2) > would be control the trigger interval. However, in my case, I have to dedup > the elements in one minute interval, which I am using a trigger interval > and cannot reduce it. Do you have any other suggestion/recommendation ? > > Also, do you have any timeline for the availability of DataSourceV2/Spark > 2.3 ? > > Thanks again. > > > On Wednesday, January 3, 2018 2:27 PM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > > > 1. It is all the result data in that trigger. Note that it takes a > DataFrame which is a purely logical representation of data and has no > association with partitions, etc. which are physical representations. > > 2. If you want to limit the amount of data that is processed in a trigger, > then you should either control the trigger interval or use the rate limit > options on sources that support it (e.g. for kafka, you can use the option > "maxOffsetsPerTrigger", see the guide > <https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html> > ). > > Related note, these APIs are subject to change. In fact in the upcoming > release 2.3, we are adding a DataSource V2 API for > batch/microbatch-streaming/continuous-streaming sources and sinks. > > On Wed, Jan 3, 2018 at 11:23 PM, M Singh <mans2si...@yahoo.com.invalid> > wrote: > > Hi: > > The documentation for Sink.addBatch is as follows: > > /** >* Adds a batch of data to this sink. The data for a given `batchId` is > deterministic and if >* this method is called more than once with the same batchId (which > will happen in the case of >* failures), then `data` should only be added once. >* >* Note 1: You cannot apply any operators on `data` except consuming it > (e.g., `collect/foreach`). >* Otherwise, you may get a wrong result. >* >* Note 2: The method is supposed to be executed synchronously, i.e. > the method should only return >* after data is consumed by sink successfully. >*/ > def addBatch(batchId: Long, data: DataFrame): Unit > > A few questions about the data is each DataFrame passed as the argument to > addBatch - > 1. Is it all the data in a partition for each trigger or is it all the > data in that trigger ? > 2. Is there a way to control the size in each addBatch invocation to make > sure that we don't run into OOM exception on the executor while calling > collect ? > > Thanks > > > > >
Re: Is spark-env.sh sourced by Application Master and Executor for Spark on YARN?
Hi, My understanding is that AM with the driver (in cluster deploy mode) and executors are simple Java processes with their settings set one by one while submitting a Spark application for execution and creating ContainerLaunchContext for launching YARN containers. See https://github.com/apache/spark/blob/master/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala?utf8=%E2%9C%93#L796-L801 for the code that does the settings to properties mapping. With that I think conf/spark-defaults.conf won't be loaded by itself. Why don't you set a property and see if it's available on the driver in cluster deploy mode? That should give you a definitive answer (or at least get you closer). Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Mastering Spark SQL https://bit.ly/mastering-spark-sql Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Kafka Streams https://bit.ly/mastering-kafka-streams Follow me at https://twitter.com/jaceklaskowski On Wed, Jan 3, 2018 at 7:57 AM, John Zhuge <jzh...@apache.org> wrote: > Hi, > > I am running Spark 2.0.0 and 2.1.1 on YARN in a Hadoop 2.7.3 cluster. Is > spark-env.sh sourced when starting the Spark AM container or the executor > container? > > Saw this paragraph on https://github.com/apache/spark/blob/master/docs/ > configuration.md: > > Note: When running Spark on YARN in cluster mode, environment variables >> need to be set using the spark.yarn.appMasterEnv.[ >> EnvironmentVariableName] property in your conf/spark-defaults.conf file. >> Environment variables that are set in spark-env.sh will not be reflected >> in the YARN Application Master process in clustermode. See the YARN-related >> Spark Properties >> <https://github.com/apache/spark/blob/master/docs/running-on-yarn.html#spark-properties> >> for >> more information. > > > Does it mean spark-env.sh will not be sourced when starting AM in cluster > mode? > Does this paragraph appy to executor as well? > > Thanks, > -- > John Zhuge >
Re: How to...UNION ALL of two SELECTs over different data sources in parallel?
Thanks Silvio! In the meantime, with help of Adam and code review of WholeStageCodegenExec and CollapseCodegenStages, I found out that anything that's codegend is as fast as the tasks in a stage. In this case, union of two codegend subtrees is indeed parallel. Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Sat, Dec 16, 2017 at 7:12 PM, Silvio Fiorito < silvio.fior...@granturing.com> wrote: > Hi Jacek, > > > > Just replied to the SO thread as well, but… > > > > Yes, your first statement is correct. The DFs in the union are read in the > same stage, so in your example where each DF has 8 partitions then you have > a stage with 16 tasks to read the 2 DFs. There's no need to define the DF > in a separate thread. You can verify this also in the Stage UI and looking > at the Event Timeline. You should see the tasks across the DFs executing in > parallel as expected. > > > > Here’s the UI for the following example, in which case each DF only has 1 > partition (so we get a stage with 2 tasks): > > > > spark.range(1, 100, 1, 1).write.save("/tmp/df1") > > spark.range(101, 200, 1, 1).write.save("/tmp/df2") > > > > spark.read.load("/tmp/df1").union(spark.read.load("/tmp/df2")).foreach { > _ => } > > > > > > *From: *Jacek Laskowski <ja...@japila.pl> > *Date: *Saturday, December 16, 2017 at 6:40 AM > *To: *"user @spark" <user@spark.apache.org> > *Subject: *How to...UNION ALL of two SELECTs over different data sources > in parallel? > > > > Hi, > > > > I've been trying to find out the answer to the question about UNION ALL > and SELECTs @ https://stackoverflow.com/q/47837955/1305344 > > > > > If I have Spark SQL statement of the form SELECT [...] UNION ALL SELECT > [...], will the two SELECT statements be executed in parallel? In my > specific use case the two SELECTs are querying two different database > tables. In contrast to what I would have expected, the Spark UI seems to > suggest that the two SELECT statements are performed sequentially. > > > > How to know if the two separate SELECTs are executed in parallel or not? > What are the tools to know it? > > > > My answer was to use explain operator that would show...well...physical > plan, but am not sure how to read it to know whether a query plan is going > to be executed in parallel or not. > > > > I then used the underlying RDD lineage (using rdd.toDebugString) hoping > that gives me the answer, but...I'm not so sure. > > > > For a query like the following: > > > > val q = spark.range(1).union(spark.range(2)) > > > > I thought that since both SELECTs are codegen'ed they could be executed in > parallel, but when switched to the RDD lineage I lost my confidence given > there's just one single stage (!) > > > > scala> q.rdd.toDebugString > > res4: String = > > (16) MapPartitionsRDD[17] at rdd at :26 [] > > | MapPartitionsRDD[16] at rdd at :26 [] > > | UnionRDD[15] at rdd at :26 [] > > | MapPartitionsRDD[11] at rdd at :26 [] > > | MapPartitionsRDD[10] at rdd at :26 [] > > | ParallelCollectionRDD[9] at rdd at :26 [] > > | MapPartitionsRDD[14] at rdd at :26 [] > > | MapPartitionsRDD[13] at rdd at :26 [] > > | ParallelCollectionRDD[12] at rdd at :26 [] > > > > What am I missing and how to be certain whether and what parts of a query > are going to be executed in parallel? > > > > Please help... > > > > Pozdrawiam, > > Jacek Laskowski > > > > https://about.me/JacekLaskowski > > Spark Structured Streaming https://bit.ly/spark-structured-streaming > > Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark > > Follow me at https://twitter.com/jaceklaskowski >
How to...UNION ALL of two SELECTs over different data sources in parallel?
Hi, I've been trying to find out the answer to the question about UNION ALL and SELECTs @ https://stackoverflow.com/q/47837955/1305344 > If I have Spark SQL statement of the form SELECT [...] UNION ALL SELECT [...], will the two SELECT statements be executed in parallel? In my specific use case the two SELECTs are querying two different database tables. In contrast to what I would have expected, the Spark UI seems to suggest that the two SELECT statements are performed sequentially. How to know if the two separate SELECTs are executed in parallel or not? What are the tools to know it? My answer was to use explain operator that would show...well...physical plan, but am not sure how to read it to know whether a query plan is going to be executed in parallel or not. I then used the underlying RDD lineage (using rdd.toDebugString) hoping that gives me the answer, but...I'm not so sure. For a query like the following: val q = spark.range(1).union(spark.range(2)) I thought that since both SELECTs are codegen'ed they could be executed in parallel, but when switched to the RDD lineage I lost my confidence given there's just one single stage (!) scala> q.rdd.toDebugString res4: String = (16) MapPartitionsRDD[17] at rdd at :26 [] | MapPartitionsRDD[16] at rdd at :26 [] | UnionRDD[15] at rdd at :26 [] | MapPartitionsRDD[11] at rdd at :26 [] | MapPartitionsRDD[10] at rdd at :26 [] | ParallelCollectionRDD[9] at rdd at :26 [] | MapPartitionsRDD[14] at rdd at :26 [] | MapPartitionsRDD[13] at rdd at :26 [] | ParallelCollectionRDD[12] at rdd at :26 [] What am I missing and how to be certain whether and what parts of a query are going to be executed in parallel? Please help... Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski
Re: is Union or Join Supported for Spark Structured Streaming Queries in 2.2.0?
Hi, join between streaming and batch/static Datasets is supported for sure --> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#join-operations I'm not sure about union, but that's just easy to check (and am leaving it as your home exercise). You cannot have datasets of different schema in a query. You'd have to use the most wide schema to cover all schemas. p.s. Have you tried anything...spark-shell's your friend, my friend :) Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Wed, Dec 13, 2017 at 11:16 PM, kant kodali <kanth...@gmail.com> wrote: > Hi All, > > I have messages in a queue that might be coming in with few different > schemas like > msg 1 schema 1, msg2 schema2, msg3 schema3, msg 4 schema1 > > I want to put all of this in one data frame. is it possible with > structured streaming? > > I am using Spark 2.2.0 > > Thanks! > >
Re: Why Spark 2.2.1 still bundles old Hive jars?
Hi, https://issues.apache.org/jira/browse/SPARK-19076 Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Mon, Dec 11, 2017 at 7:43 AM, An Qin <a...@qilinsoft.com> wrote: > Hi, all, > > > > I want to include Sentry 2.0.0 in my Spark project. However it bundles > Hive 2.3.2. I find the newest Spark 2.2.1 still bundles old Hive jars, for > example, hive-exec-1.2.1.spark2.jar. Why does it upgrade to the new Hive? > Are they compatible? > > > > Regards, > > > > > > Qin An. > > > > > > >
Re: Infer JSON schema in structured streaming Kafka.
Hi, What about a custom streaming Sink that would stop the query after addBatch has been called? Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Mon, Dec 11, 2017 at 9:15 AM, satyajit vegesna < satyajit.apas...@gmail.com> wrote: > Hi Jacek, > > For now , i am using Thread.sleep() on driver, to make sure my streaming > query receives some data and and stop it, before the control reaches > querying memory table. > Let me know if there is any better way of handling it. > > Regards, > Satyajit. > > On Sun, Dec 10, 2017 at 10:43 PM, satyajit vegesna < > satyajit.apas...@gmail.com> wrote: > >> Hi Jacek, >> >> Thank you for responding back, >> >> i have tried memory sink, and below is what i did >> >> val fetchValue = debeziumRecords.selectExpr("value").withColumn("tableName", >> functions.get_json_object($"value".cast(StringType), "$.schema.name")) >> .withColumn("operation", >> functions.get_json_object($"value".cast(StringType), >> "$.payload.op")) >> .withColumn("payloadAfterValue", >> split(substring_index(debeziumRecords("value"), >> "\"after\":" ,-1),",\"source\"").getItem(0)) >> .drop("tableName").drop("operation").drop("value").as[String].writeStream >> >> .outputMode(OutputMode.Append()) >> .queryName("record") >> .format("memory") >> .start() >> >> spark.sql("select * from record").show(truncate = false) //i was >> expecting to be able to use the record table to read the JSON string, but >> the table is empty for the first call. And i do not see any dataframe >> output after the first one >> >> *But yeah the above steps work good and i can do things that i need to, >> in spark-shell, the problem is when i try to code in Intellij, because the >> streaming query keeps running and i am not sure how to identify and stop >> the streaming query and use record memory table.* >> >> So i would like to stop the streaming query once i know i have some data >> in my record memory table(is there a way to do that), so i can stop the >> streaming query and use the memory table, fetch my record. >> Any help on how to approach the situation programmatically/any examples >> pointed would highly be appreciated. >> >> Regards, >> Satyajit. >> >> >> >> On Sun, Dec 10, 2017 at 9:52 PM, Jacek Laskowski <ja...@japila.pl> wrote: >> >>> Hi, >>> >>> What about memory sink? That could work. >>> >>> Pozdrawiam, >>> Jacek Laskowski >>> >>> https://about.me/JacekLaskowski >>> Spark Structured Streaming https://bit.ly/spark-structured-streaming >>> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark >>> Follow me at https://twitter.com/jaceklaskowski >>> >>> On Mon, Dec 11, 2017 at 3:28 AM, satyajit vegesna < >>> satyajit.apas...@gmail.com> wrote: >>> >>>> Hi All, >>>> >>>> I would like to infer JSON schema from a sample of data that i receive >>>> from, Kafka Streams(specific topic), and i have to infer the schema as i am >>>> going to receive random JSON string with different schema for each topic, >>>> so i chose to go ahead with below steps, >>>> >>>> a. readStream from Kafka(latest offset), from a single Kafka topic. >>>> b. Some how to store the JSON string into val and infer the schema. >>>> c. stop the stream. >>>> d.Create new readStream(smallest offset) and use the above inferred >>>> schema to process the JSON using spark provided JSON support, like >>>> from_json, json_object and others and run my actuall business logic. >>>> >>>> Now i am not sure how to be successful with step(b). Any help would be >>>> appreciated. >>>> And would also like to know if there is any better approach. >>>> >>>> Regards, >>>> Satyajit. >>>> >>> >>> >> >
Re: Infer JSON schema in structured streaming Kafka.
Hi, What about memory sink? That could work. Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Mon, Dec 11, 2017 at 3:28 AM, satyajit vegesna < satyajit.apas...@gmail.com> wrote: > Hi All, > > I would like to infer JSON schema from a sample of data that i receive > from, Kafka Streams(specific topic), and i have to infer the schema as i am > going to receive random JSON string with different schema for each topic, > so i chose to go ahead with below steps, > > a. readStream from Kafka(latest offset), from a single Kafka topic. > b. Some how to store the JSON string into val and infer the schema. > c. stop the stream. > d.Create new readStream(smallest offset) and use the above inferred schema > to process the JSON using spark provided JSON support, like from_json, > json_object and others and run my actuall business logic. > > Now i am not sure how to be successful with step(b). Any help would be > appreciated. > And would also like to know if there is any better approach. > > Regards, > Satyajit. >
Re: pyspark + from_json(col("col_name"), schema) returns all null
Hi, Not that I'm aware of, but in your case checking out whether a JSON message fit your schema and the pipeline would've taken pyspark alone with JSONs on disk, wouldn't it? Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Mon, Dec 11, 2017 at 12:49 AM, salemi <alireza.sal...@udo.edu> wrote: > I found the root cause! There was mismatch between the StructField type and > the json message. > > > Is there a good write up / wiki out there that describes how to debug spark > jobs? > > > Thanks > > > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: RDD[internalRow] -> DataSet
Hi Satyajit, That's exactly what Dataset.rdd does --> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala?utf8=%E2%9C%93#L2916-L2921 Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Fri, Dec 8, 2017 at 5:25 AM, satyajit vegesna <satyajit.apas...@gmail.com > wrote: > Hi All, > > Is there a way to convert RDD[internalRow] to Dataset , from outside spark > sql package. > > Regards, > Satyajit. >
Re: Struct Type
Hi, Use explode function, filter operator and collect_list function. Or "heavier" flatMap. Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Fri, Nov 17, 2017 at 6:06 PM, KhajaAsmath Mohammed < mdkhajaasm...@gmail.com> wrote: > Hi, > > I have following schema in dataframe and I want to extract key which > matches as MaxSpeed from the array and it's corresponding value of the key. > > |-- tags: array (nullable = true) > ||-- element: struct (containsNull = true) > |||-- key: string (nullable = true) > |||-- value: string (nullable = true) > > is there any way to achieve it in dataframe? > > Thanks, > Asmath >
Re: Restart Spark Streaming after deployment
Hi, You're right...killing the spark streaming job is the way to go. If a batch was completed successfully, Spark Streaming will recover from the controlled failure and start where it left off. I don't think there's other way to do it. Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Wed, Nov 15, 2017 at 5:18 PM, KhajaAsmath Mohammed < mdkhajaasm...@gmail.com> wrote: > Hi, > > I am new in the usage of spark streaming. I have developed one spark > streaming job which runs every 30 minutes with checkpointing directory. > > I have to implement minor change, shall I kill the spark streaming job > once the batch is completed using yarn application -kill command and update > the jar file? > > Question I have is, if I follow the above approach will spark streaming > picks up data from offset saved in checkpoint after restart? > > is there any other better approaches you have. Thanks in advance for your > suggestions. > > Thanks, > Asmath >
Re: Structured Streaming and Hive
Hi, Guessing it's a timing issue. Once you started the query the batch 0 did not have rows to save or didn't start yet (it's a separate thread) and so spark.sql ran once and saved nothing. You should rather use foreach writer to save results to Hive. Jacek On 29 Sep 2017 11:36 am, "HanPan"wrote: > Hi guys, > > > > I’m new to spark structured streaming. I’m using 2.1.0 and my > scenario is reading specific topic from kafka and do some data mining > tasks, then save the result dataset to hive. > > While writing data to hive, somehow it seems like not supported yet > and I tried this: > >It runs ok, but no result in hive. > > > >Any idea writing the stream result to hive? > > > > Thanks > > Pan > > > > >
Re: How to read from multiple kafka topics using structured streaming (spark 2.2.0)?
Hi, Cody's right. subscribe - Topic subscription strategy that accepts topic names as a comma-separated string, e.g. topic1,topic2,topic3 [1] subscribepattern - Topic subscription strategy that uses Java’s java.util.regex.Pattern for the topic subscription regex pattern of topics to subscribe to, e.g. topic\d [2] [1] https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-KafkaSource.html#subscribe [2] https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-KafkaSource.html#subscribepattern Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Spark Structured Streaming (Apache Spark 2.2+) https://bit.ly/spark-structured-streaming Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Tue, Sep 19, 2017 at 10:34 PM, Cody Koeninger <c...@koeninger.org> wrote: > You should be able to pass a comma separated string of topics to > subscribe. subscribePattern isn't necessary > > > > On Tue, Sep 19, 2017 at 2:54 PM, kant kodali <kanth...@gmail.com> wrote: > > got it! Sorry. > > > > On Tue, Sep 19, 2017 at 12:52 PM, Jacek Laskowski <ja...@japila.pl> > wrote: > >> > >> Hi, > >> > >> Use subscribepattern > >> > >> You haven't googled well enough --> > >> https://jaceklaskowski.gitbooks.io/spark-structured- > streaming/spark-sql-streaming-KafkaSource.html > >> :) > >> > >> Pozdrawiam, > >> Jacek Laskowski > >> > >> https://about.me/JacekLaskowski > >> Spark Structured Streaming (Apache Spark 2.2+) > >> https://bit.ly/spark-structured-streaming > >> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark > >> Follow me at https://twitter.com/jaceklaskowski > >> > >> On Tue, Sep 19, 2017 at 9:50 PM, kant kodali <kanth...@gmail.com> > wrote: > >>> > >>> HI All, > >>> > >>> I am wondering How to read from multiple kafka topics using structured > >>> streaming (code below)? I googled prior to asking this question and I > see > >>> responses related to Dstreams but not structured streams. Is it > possible to > >>> read multiple topics using the same spark structured stream? > >>> > >>> sparkSession.readStream() > >>> .format("kafka") > >>> .option("kafka.bootstrap.servers", "localhost:9092") > >>> .option("subscribe", "hello1") > >>> .option("startingOffsets", "earliest") > >>> .option("failOnDataLoss", "false") > >>> .load(); > >>> > >>> > >>> Thanks! > >> > >> > > >
Re: Structured streaming coding question
Hi, Ah, right! Start the queries and once they're running, awaitTermination them. Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Spark Structured Streaming (Apache Spark 2.2+) https://bit.ly/spark-structured-streaming Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Wed, Sep 20, 2017 at 7:09 AM, kant kodali <kanth...@gmail.com> wrote: > Looks like my problem was the order of awaitTermination() for some reason. > > Doesn't work > > > > > > On Tue, Sep 19, 2017 at 1:54 PM, kant kodali <kanth...@gmail.com> wrote: > >> Hi All, >> >> I have the following Psuedo code (I could paste the real code however it >> is pretty long and involves Database calls inside dataset.map operation and >> so on) so I am just trying to simplify my question. would like to know if >> there is something wrong with the following pseudo code? >> >> DataSet inputDS = readFromKaka(topicName) >> >> DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works >> Since I can see data getting populated >> >> DataSet outputDS1 = mongoDS.map(readFromDatabase); // Works as >> well >> >> DataSet outputDS2 = mongoDS.map( readFromDatabase); // Doesn't >> work >> >> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello1")).start().awaitTermination() >> >> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello2")).start().awaitTermination() >> >> >> *So what's happening with above code is that I can see data coming out of >> hello1 topic but not from hello2 topic.* I thought there is something >> wrong with "outputDS2" so I switched the order so now the code looks like >> this >> >> DataSet inputDS = readFromKaka(topicName) >> >> DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works >> Since I can see data getting populated >> >> DataSet outputDS2 = mongoDS.map( readFromDatabase); // This Works >> >> DataSet outputDS1 = mongoDS.map(readFromDatabase); // Desn't work >> >> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello1")).start().awaitTermination() >> >> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello2")).start().awaitTermination() >> >> *Now I can see data coming out from hello2 kafka topic but not from >> hello1 topic*. *In short, I can only see data from outputDS1 or >> outputDS2 but not both. * At this point I am not sure what is going on? >> >> Thanks! >> >> >> >
Re: Structured streaming coding question
Hi, What's the code in readFromKafka to read from hello2 and hello1? Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Spark Structured Streaming (Apache Spark 2.2+) https://bit.ly/spark-structured-streaming Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Tue, Sep 19, 2017 at 10:54 PM, kant kodali <kanth...@gmail.com> wrote: > Hi All, > > I have the following Psuedo code (I could paste the real code however it > is pretty long and involves Database calls inside dataset.map operation and > so on) so I am just trying to simplify my question. would like to know if > there is something wrong with the following pseudo code? > > DataSet inputDS = readFromKaka(topicName) > > DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works Since > I can see data getting populated > > DataSet outputDS1 = mongoDS.map(readFromDatabase); // Works as well > > DataSet outputDS2 = mongoDS.map( readFromDatabase); // Doesn't work > > outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new > KafkaSink("hello1")).start().awaitTermination() > > outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new > KafkaSink("hello2")).start().awaitTermination() > > > *So what's happening with above code is that I can see data coming out of > hello1 topic but not from hello2 topic.* I thought there is something > wrong with "outputDS2" so I switched the order so now the code looks like > this > > DataSet inputDS = readFromKaka(topicName) > > DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works Since > I can see data getting populated > > DataSet outputDS2 = mongoDS.map( readFromDatabase); // This Works > > DataSet outputDS1 = mongoDS.map(readFromDatabase); // Desn't work > > outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new > KafkaSink("hello1")).start().awaitTermination() > > outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new > KafkaSink("hello2")).start().awaitTermination() > > *Now I can see data coming out from hello2 kafka topic but not from hello1 > topic*. *In short, I can only see data from outputDS1 or outputDS2 but > not both. * At this point I am not sure what is going on? > > Thanks! > > >