Re: Error while merge in delta table

2023-05-11 Thread Jacek Laskowski
Hi Karthick,

Sorry to say it but there's not enough "data" to help you. There should be
something more above or below this exception snippet you posted that could
pinpoint the root cause.

Pozdrawiam,
Jacek Laskowski

"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Wed, May 10, 2023 at 8:03 PM Karthick Nk  wrote:

> Hi,
>
> I am trying to merge daaframe with delta table in databricks, but i am
> getting error, i have attached the code nippet and error message for
> reference below,
>
> code:
> [image: image.png]
>
> error:
>
> [image: image.png]
>
> Thanks
>


Re: How to determine the function of tasks on each stage in an Apache Spark application?

2023-04-14 Thread Jacek Laskowski
Hi,

Start with intercepting stage completions using SparkListenerStageCompleted
[1]. That's Spark Core (jobs, stages and tasks).

Go up the execution chain to Spark SQL with SparkListenerSQLExecutionStart
[2] and SparkListenerSQLExecutionEnd [3], and correlate infos.

You may want to look at how web UI works under the covers to collect all
the information. Start from SQLTab that should give you what is displayed
(that should give you then what's needed and how it's collected).

[1]
https://github.com/apache/spark/blob/8cceb3946bdfa5ceac0f2b4fe6a7c43eafb76d59/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala#L46
[2]
https://github.com/apache/spark/blob/24cdae8f3dcfc825c6c0b8ab8aa8505ae194050b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L44
[3]
https://github.com/apache/spark/blob/24cdae8f3dcfc825c6c0b8ab8aa8505ae194050b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L60
[4]
https://github.com/apache/spark/blob/c124037b97538b2656d29ce547b2a42209a41703/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala#L24

Pozdrawiam,
Jacek Laskowski

"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Thu, Apr 13, 2023 at 10:40 AM Trường Trần Phan An 
wrote:

> Hi,
>
> Can you give me more details or give me a tutorial on "You'd have to
> intercept execution events and correlate them. Not an easy task yet doable"
>
> Thank
>
> Vào Th 4, 12 thg 4, 2023 vào lúc 21:04 Jacek Laskowski 
> đã viết:
>
>> Hi,
>>
>> tl;dr it's not possible to "reverse-engineer" tasks to functions.
>>
>> In essence, Spark SQL is an abstraction layer over RDD API that's made up
>> of partitions and tasks. Tasks are Scala functions (possibly with some
>> Python for PySpark). A simple-looking high-level operator like
>> DataFrame.join can end up with multiple RDDs, each with a set of partitions
>> (and hence tasks). What the tasks do is an implementation detail that you'd
>> have to know about by reading the source code of Spark SQL that produces
>> the "bytecode".
>>
>> Just looking at the DAG or the tasks screenshots won't give you that
>> level of detail. You'd have to intercept execution events and correlate
>> them. Not an easy task yet doable. HTH.
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> "The Internals Of" Online Books <https://books.japila.pl/>
>> Follow me on https://twitter.com/jaceklaskowski
>>
>> <https://twitter.com/jaceklaskowski>
>>
>>
>> On Tue, Apr 11, 2023 at 6:53 PM Trường Trần Phan An <
>> truong...@vlute.edu.vn> wrote:
>>
>>> Hi all,
>>>
>>> I am conducting a study comparing the execution time of Bloom Filter
>>> Join operation on two environments: Apache Spark Cluster and Apache Spark.
>>> I have compared the overall time of the two environments, but I want to
>>> compare specific "tasks on each stage" to see which computation has the
>>> most significant difference.
>>>
>>> I have taken a screenshot of the DAG of Stage 0 and the list of tasks
>>> executed in Stage 0.
>>> - DAG.png
>>> - Task.png
>>>
>>> *I have questions:*
>>> 1. Can we determine which tasks are responsible for executing each step
>>> scheduled on the DAG during the processing?
>>> 2. Is it possible to know the function of each task (e.g., what is task
>>> ID 0 responsible for? What is task ID 1 responsible for? ... )?
>>>
>>> Best regards,
>>> Truong
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: How to create spark udf use functioncatalog?

2023-04-14 Thread Jacek Laskowski
Hi,

I'm not sure I understand the question, but if your question is how to
register (plug-in) your own custom FunctionCatalog, it's through
spark.sql.catalog configuration property, e.g.

spark.sql.catalog.catalog-name=com.example.YourCatalogClass

spark.sql.catalog registers a CatalogPlugin that in your case is also
supposed to be a FunctionCatalog.

When needed, implicit class CatalogHelper.asFunctionCatalog is going to be
used to offer your custom CatalogPlugin (e.g., catalog-name above) so
functions identified by three-part identifiers (catalog.schema.function)
are resolved and used properly using the custom catalog impl.

HTH

Pozdrawiam,
Jacek Laskowski

"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Fri, Apr 14, 2023 at 2:10 PM 许新浩 <948718...@qq.com.invalid> wrote:

> We are using spark.Today I see the FunctionCatalog , and I have seen the
> source of
> spark\sql\core\src\test\scala\org\apache\spark\sql\connector\DataSourceV2FunctionSuite.scala
> and have implements the ScalarFunction.But i still not konw how
> to register it in sql


Re: How to determine the function of tasks on each stage in an Apache Spark application?

2023-04-12 Thread Jacek Laskowski
Hi,

tl;dr it's not possible to "reverse-engineer" tasks to functions.

In essence, Spark SQL is an abstraction layer over RDD API that's made up
of partitions and tasks. Tasks are Scala functions (possibly with some
Python for PySpark). A simple-looking high-level operator like
DataFrame.join can end up with multiple RDDs, each with a set of partitions
(and hence tasks). What the tasks do is an implementation detail that you'd
have to know about by reading the source code of Spark SQL that produces
the "bytecode".

Just looking at the DAG or the tasks screenshots won't give you that level
of detail. You'd have to intercept execution events and correlate them. Not
an easy task yet doable. HTH.

Pozdrawiam,
Jacek Laskowski

"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Tue, Apr 11, 2023 at 6:53 PM Trường Trần Phan An 
wrote:

> Hi all,
>
> I am conducting a study comparing the execution time of Bloom Filter Join
> operation on two environments: Apache Spark Cluster and Apache Spark. I
> have compared the overall time of the two environments, but I want to
> compare specific "tasks on each stage" to see which computation has the
> most significant difference.
>
> I have taken a screenshot of the DAG of Stage 0 and the list of tasks
> executed in Stage 0.
> - DAG.png
> - Task.png
>
> *I have questions:*
> 1. Can we determine which tasks are responsible for executing each step
> scheduled on the DAG during the processing?
> 2. Is it possible to know the function of each task (e.g., what is task ID
> 0 responsible for? What is task ID 1 responsible for? ... )?
>
> Best regards,
> Truong
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: [SparkSQL, SparkUI, RESTAPI] How to extract the WholeStageCodeGen ids from SparkUI

2023-04-12 Thread Jacek Laskowski
Hi,

You could use QueryExecutionListener or Spark listeners to intercept query
execution events and extract whatever is required. That's what web UI does
(as it's simply a bunch of SparkListeners --> https://youtu.be/mVP9sZ6K__Y
;-)).

Pozdrawiam,
Jacek Laskowski

"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Fri, Apr 7, 2023 at 12:23 PM Chenghao Lyu  wrote:

> Hi,
>
> The detailed stage page shows the involved WholeStageCodegen Ids in its
> DAG visualization from the Spark UI when running a SparkSQL. (e.g., under
> the link
> node:18088/history/application_1663600377480_62091/stages/stage/?id=1=0).
>
> However, I have trouble extracting the WholeStageCodegen ids from the DAG
> visualization via the RESTAPIs. Is there any other way to get the
> WholeStageCodegen Ids information for each stage automatically?
>
> Cheers,
> Chenghao
>


Re: spark.catalog.listFunctions type signatures

2023-03-28 Thread Jacek Laskowski
Hi,

Interesting question indeed!

The closest I could get would be to use lookupFunctionBuilder(name:
FunctionIdentifier): Option[FunctionBuilder] [1] followed by extracting the
dataType from T in `type FunctionBuilder = Seq[Expression] => T` which can
be Expression (regular functions) or LogicalPlan (table-valued functions).
Expression has got dataType while LogicalPlan has got output
(or outputAttributes).

HTH

Let us know how you're doing.

BTW, Can you describe how you "using Apache Calcite to run some SQL
transformations on Apache sparks SQL statements"?

[1]
https://github.com/apache/spark/blob/e60ce3e85081ca8bb247aeceb2681faf6a59a056/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L91

Pozdrawiam,
Jacek Laskowski

"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Tue, Mar 28, 2023 at 9:01 PM Guillaume Masse <
masse.guilla...@narrative.io> wrote:

> Hi,
>
> I'm using Apache Calcite to run some SQL transformations on Apache sparks
> SQL statements. I would like to extract the type signature out
> of spark.catalog.listFunctions to be able to register them in Calcite with
> their proper signature.
>
> From the API, I can get the fully qualified class name and the name, but
> unfortunately, the type signature is not present. Would there be a way to
> use reflection to extract? For example:
>
>
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala#L424
>
> Ideally, it would be convenient to get the type signature
> from org.apache.spark.sql.catalog.Function itself when available.
>
>
> --
> Guillaume Massé
> [Gee-OHM]
> (马赛卫)
>


Re: [ANNOUNCE] Apache Spark 3.3.1 released

2022-10-26 Thread Jacek Laskowski
Yoohoo! Thanks Yuming for driving this release. A tiny step for Spark a
huge one for my clients (who still are on 3.2.1 or even older :))

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Wed, Oct 26, 2022 at 8:22 AM Yuming Wang  wrote:

> We are happy to announce the availability of Apache Spark 3.3.1!
>
> Spark 3.3.1 is a maintenance release containing stability fixes. This
> release is based on the branch-3.3 maintenance branch of Spark. We strongly
> recommend all 3.3 users to upgrade to this stable release.
>
> To download Spark 3.3.1, head over to the download page:
> https://spark.apache.org/downloads.html
>
> To view the release notes:
> https://spark.apache.org/releases/spark-release-3-3-1.html
>
> We would like to acknowledge all community members for contributing to this
> release. This release would not have been possible without you.
>
>
>


Re: Prometheus with spark

2022-10-25 Thread Jacek Laskowski
Hi Raj,

Do you want to do the following?

spark.read.format("prometheus").load...

I haven't heard of such a data source / format before.

What would you like it for?

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Fri, Oct 21, 2022 at 6:12 PM Raj ks  wrote:

> Hi Team,
>
>
> We wanted to query Prometheus data with spark. Any suggestions will
> be appreciated
>
> Searched for documents but did not got any prompt one
>


Re: query time comparison to several SQL engines

2022-04-07 Thread Jacek Laskowski
Hi Wes,

Thanks for the report! I like it (mostly because it's short and concise).
Thank you.

I know nothing about Drill and am curious about the similar execution times
and this sentence in the report: "Spark is the second fastest, that should
be reasonable, since both Spark and Drill have almost the same
implementation architecture.".

Is this true that Drill is Spark or vice versa under the hood? If so, how
is it possible that Drill is faster? What does Drill do to make the query
faster? Could this be that you used a type of query Drill is optimized for?
Just guessing and am really curious (not implying that one is better or
worse than the other(s)).

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Thu, Apr 7, 2022 at 1:05 PM Wes Peng  wrote:

> I made a simple test to query time for several SQL engines including
> mysql, hive, drill and spark. The report,
>
> https://cloudcache.net/data/query-time-mysql-hive-drill-spark.pdf
>
> It maybe have no special meaning, just for fun. :)
>
> regards.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Stream on Kubernetes Cannot Set up JavaSparkContext

2021-09-05 Thread Jacek Laskowski
Hi,

No idea still, but noticed
"org.apache.spark.streaming.kafka010.KafkaRDDPartition" and "--jars
"spark-yarn_2.12-3.1.2.jar,spark-core_2.12-3.1.2.jar,kafka-clients-2.8.0.jar,spark-streaming-kafka-0-10_2.12-3.1.2.jar,spark-token-provider-kafka-0-10_2.12-3.1.2.jar"
\" that bothers me quite a lot.

First of all, it's a Spark Streaming (not Structured Streaming) app.
Correct? Please upgrade at your earliest convenience since it's no longer
in active development (if supported at all).

Secondly, why are these jars listed explicitly since they're part of Spark?
You should not really be doing such risky config changes (unless you've got
no other choice and you know what you're doing).

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Tue, Aug 31, 2021 at 1:00 PM Stelios Philippou 
wrote:

> Yes you are right.
> I am using Spring Boot for this.
>
> The same does work for the event that does not involve any kafka events.
> But again i am not sending out extra jars there so nothing is replaced and
> we are using the default ones.
>
> If i do not use the userClassPathFirst which will force the service to use
> the newer version i will end up with the same problem
>
> We are using protobuf v3+ and as such we need to push that version since
> apache core uses an older version.
>
> So all we should really need is the following : --jars
> "protobuf-java-3.17.3.jar" \
> and here we need the userClassPathFirst=true in order to use the latest
> version.
>
>
> Using only this jar as it works on local or no jars defined we ended up
> with the following error.
>
> 21/08/31 10:53:40 WARN  org.apache.spark.scheduler.TaskSetManager: Lost
> task 0.0 in stage 18.0 (TID 139) (10.60.63.56 executor 1):
> java.lang.ClassNotFoundException:
> org.apache.spark.streaming.kafka010.KafkaRDDPartition
>
> at java.base/java.net.URLClassLoader.findClass(Unknown Source)
>
> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>
> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>
> at java.base/java.lang.Class.forName0(Native Method)
>
> at java.base/java.lang.Class.forName(Unknown Source)
>
> at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
>
>
>
>
> Which can be resolved with passing more jars.
>
>
> Any idea about this error ?
>
> K8 does not seem to like this, but Java Spring should be the one that is
> responsible for the version but it seems K8 does not like this versions.
>
> Perhaps miss configuration on K8 ?
>
> I haven't set that up so i am not aware of what was done there.
>
>
>
> For downgrading to java 8 on my K8 might not be so easy. I want to explore
> if there is something else before doing that as we will need to spin off
> new instances of K8 to check that.
>
>
>
> Thank you for the time taken
>
>
>
>
> On Tue, 31 Aug 2021 at 12:26, Jacek Laskowski  wrote:
>
>> Hi Stelios,
>>
>> I've never seen this error before, but a couple of things caught
>> my attention that I would look at closer to chase the root cause of the
>> issue.
>>
>> "org.springframework.context.annotation.AnnotationConfigApplicationContext:"
>> and "21/08/31 07:28:42 ERROR  org.springframework.boot.SpringApplication:
>> Application run failed" seem to indicate that you're using Spring Boot
>> (that I know almost nothing about so take the following with a pinch of
>> salt :))
>>
>> Spring Boot manages the classpath by itself and together with another
>> interesting option in your
>> spark-submit, spark.driver.userClassPathFirst=true, makes me wonder how
>> much this exception:
>>
>> > org.apache.spark.scheduler.ExternalClusterManager:
>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>> subtype
>>
>> could be due to casting compatible types from two different classloaders?
>>
>> Just a thought but wanted to share as I think it's worth investigating.
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://about.me/JacekLaskowski
>> "The Internals Of" Online Books <https://books.japila.pl/>
>> Follow me on https://twitter.com/jaceklaskowski
>>
>> <https://twitter.com/jaceklaskowski>
>>
>>
>> On Tue, Aug 31, 2021 at 9:44 AM Stelios Philippou 
>> wrote:
>>
>>> Hello,
>>>
>>> I have been facing the current issue for some time no

Re: Spark Stream on Kubernetes Cannot Set up JavaSparkContext

2021-08-31 Thread Jacek Laskowski
Hi Stelios,

I've never seen this error before, but a couple of things caught
my attention that I would look at closer to chase the root cause of the
issue.

"org.springframework.context.annotation.AnnotationConfigApplicationContext:"
and "21/08/31 07:28:42 ERROR  org.springframework.boot.SpringApplication:
Application run failed" seem to indicate that you're using Spring Boot
(that I know almost nothing about so take the following with a pinch of
salt :))

Spring Boot manages the classpath by itself and together with another
interesting option in your
spark-submit, spark.driver.userClassPathFirst=true, makes me wonder how
much this exception:

> org.apache.spark.scheduler.ExternalClusterManager:
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
subtype

could be due to casting compatible types from two different classloaders?

Just a thought but wanted to share as I think it's worth investigating.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Tue, Aug 31, 2021 at 9:44 AM Stelios Philippou 
wrote:

> Hello,
>
> I have been facing the current issue for some time now and I was wondering
> if someone might have some inside on how I can resolve the following.
>
> The code (java 11) is working correctly on my local machine but whenever I
> try to launch the following on K8 I am getting the following error.
>
> 21/08/31 07:28:42 ERROR  org.apache.spark.SparkContext: Error
> initializing SparkContext.
>
> java.util.ServiceConfigurationError:
> org.apache.spark.scheduler.ExternalClusterManager:
> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
> subtype
>
>
>
> I have a spark that will monitor some directories and handle the data
> accordingly.
>
> That part is working correctly on K8 and the SparkContext has no issue
> being initialized there.
>
>
> This is the spark-submit for that
>
>
> spark-submit \
> --master=k8s://https://url:port \
> --deploy-mode cluster \
> --name a-name\
> --conf spark.driver.userClassPathFirst=true  \
> --conf spark.kubernetes.file.upload.path=hdfs://upload-path \
> --files "application-dev.properties,keystore.jks,truststore.jks"  \
> --conf spark.kubernetes.container.image=url/spark:spark-submit \
> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
> --conf spark.kubernetes.namespace=spark \
> --conf spark.kubernetes.container.image.pullPolicy=Always \
> --conf spark.dynamicAllocation.enabled=false \
> --driver-memory 525m --executor-memory 525m \
> --num-executors 1 --executor-cores 1 \
> target/SparkStream.jar continuous-merge
>
>
> My issue comes when I try to launch the service in order to listen to
> kafka events and store them in HDFS.
>
>
> spark-submit \
> --master=k8s://https://url:port \
> --deploy-mode cluster \
> --name consume-data \
> --conf spark.driver.userClassPathFirst=true  \
> --conf spark.kubernetes.file.upload.path=hdfs://upload-path\
> --files "application-dev.properties,keystore.jks,truststore.jks"  \
> --jars 
> "spark-yarn_2.12-3.1.2.jar,spark-core_2.12-3.1.2.jar,kafka-clients-2.8.0.jar,spark-streaming-kafka-0-10_2.12-3.1.2.jar,spark-token-provider-kafka-0-10_2.12-3.1.2.jar"
>  \
> --conf spark.kubernetes.container.image=url/spark:spark-submit \
> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
> --conf spark.kubernetes.authenticate.executor.serviceAccountName=spark \
> --conf spark.kubernetes.namespace=spark \
> --conf spark.kubernetes.container.image.pullPolicy=Always \
> --conf spark.dynamicAllocation.enabled=false \
> --driver-memory 1g --executor-memory 1g \
> --num-executors 1 --executor-cores 1 \
> target/SparkStream.jar consume
>
>
> It could be that I am launching the application wrongly or perhaps that my
> K8 is not configured correctly ?
>
>
>
> I have stripped down my code and left it barebone and will end up with the
> following issue :
>
>
> 21/08/31 07:28:42 ERROR  org.apache.spark.SparkContext: Error
> initializing SparkContext.
>
> java.util.ServiceConfigurationError:
> org.apache.spark.scheduler.ExternalClusterManager:
> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
> subtype
>
> at java.base/java.util.ServiceLoader.fail(Unknown Source)
>
> at
> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(Unknown
> Source)
>
> at
> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(Unknown
> Source)
>
> at java.base/java.util.ServiceLoader$

Re: Connection reset by peer : failed to remove cache rdd

2021-08-30 Thread Jacek Laskowski
Hi,

No idea what might be going on here, but I'd not worry much about it and
simply monitor disk usage as some broadcast blocks might have left over.

Do you know when in your application lifecycle it happens? Spark SQL or
Structured Streaming? Do you use broadcast variables or are the errors
coming from broadcast joins perhaps?

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Mon, Aug 30, 2021 at 3:26 PM Harsh Sharma 
wrote:

> We are facing issue in production where we are getting frequent
>
> Still have 1 request outstanding when connection with the hostname was
> closed
>
> connection reset by peer : errors as well as warnings  : failed to remove
> cache rdd or failed  to remove broadcast variable.
>
> Please help us how to mitigate this  :
>
> Executor memory : 12g
>
> Network timeout :   60
>
> Heartbeat interval : 25
>
>
>
> [Stage 284:>(199 + 1) / 200][Stage 292:>  (1 + 3)
> / 200]
> [Stage 284:>(199 + 1) / 200][Stage 292:>  (2 + 3)
> / 200]
> [Stage 292:>  (2 + 4)
> / 200][14/06/21 10:46:17,006 WARN
> shuffle-server-4](TransportChannelHandler) Exception in connection from
> 
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:378)
> at
> io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313)
> at
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
> at
> io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
> at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> at java.lang.Thread.run(Thread.java:748)
> [14/06/21 10:46:17,010 ERROR shuffle-server-4](TransportResponseHandler)
> Still have 1 requests outstanding when connection from  is closed
> [14/06/21 10:46:17,012 ERROR Spark Context Cleaner](ContextCleaner) Error
> cleaning broadcast 159
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:378)
> at
> io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313)
> at
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
> at
> io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
> at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> at java.lang.Thread.run(Thread.java:748)
> [14/06/21 10:46:17,012 WARN
> block-manager-ask-thread-pool-69](BlockManagerMaster) Failed to remove
> broadcast 159 with removeFromMaster = true - Connection reset by peer
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> at sun.

Re: Java : Testing RDD aggregateByKey

2021-08-21 Thread Jacek Laskowski
Hi Pedro,

> Anyway, maybe the behavior is weird, I could expect that repartition to
zero was not allowed or at least warned instead of just discarting all the
data .

Interesting...

scala> spark.version
res3: String = 3.1.2

scala> spark.range(5).repartition(0)
java.lang.IllegalArgumentException: requirement failed: Number of
partitions (0) must be positive.
  at scala.Predef$.require(Predef.scala:281)
  at
org.apache.spark.sql.catalyst.plans.logical.Repartition.(basicLogicalOperators.scala:1032)
  at org.apache.spark.sql.Dataset.repartition(Dataset.scala:3016)
  ... 47 elided

How are the above different from yours?

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Thu, Aug 19, 2021 at 5:43 PM Pedro Tuero  wrote:

> Hi, I'm sorry , the problem was really silly: In the test the number of
> partitions were zero  (it was a division of the original number of
> partitions of the RDD source and in the test that number was just one) and
> that's why the test was failing.
> Anyway, maybe the behavior is weird, I could expect that repartition to
> zero was not allowed or at least warned instead of just discarting all the
> data .
>
> Thanks for your time!
> Regards,
> Pedro
>
> El jue, 19 de ago. de 2021 a la(s) 07:42, Jacek Laskowski (ja...@japila.pl)
> escribió:
>
>> Hi Pedro,
>>
>> No idea what might be causing it. Do you perhaps have some code to
>> reproduce it locally?
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://about.me/JacekLaskowski
>> "The Internals Of" Online Books <https://books.japila.pl/>
>> Follow me on https://twitter.com/jaceklaskowski
>>
>> <https://twitter.com/jaceklaskowski>
>>
>>
>> On Tue, Aug 17, 2021 at 4:14 PM Pedro Tuero  wrote:
>>
>>>
>>> Context: spark-core_2.12-3.1.1
>>> Testing with maven and eclipse.
>>>
>>> I'm modifying a project and a test stops working as expected.
>>> The difference is in the parameters passed to the function
>>> aggregateByKey of JavaPairRDD.
>>>
>>> JavaSparkContext is created this way:
>>> new JavaSparkContext(new SparkConf()
>>> .setMaster("local[1]")
>>> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"));
>>> Then I construct a JavaPairRdd using sparkContext.paralellizePairs and
>>> call a method which makes an aggregateByKey over the input JavaPairRDD  and
>>> test that the result is the expected.
>>>
>>> When I use JavaPairRDD line 369 (doing .aggregateByKey(zeroValue,
>>> combiner, merger);
>>>  def aggregateByKey[U](zeroValue: U, seqFunc: JFunction2[U, V, U],
>>> combFunc: JFunction2[U, U, U]):
>>>   JavaPairRDD[K, U] = {
>>> implicit val ctag: ClassTag[U] = fakeClassTag
>>> fromRDD(rdd.aggregateByKey(zeroValue)(seqFunc, combFunc))
>>>   }
>>> The test works as expected.
>>> But when I use: JavaPairRDD line 355 (doing .aggregateByKey(zeroValue,
>>> *partitions*,combiner, merger);)
>>> def aggregateByKey[U](zeroValue: U, *numPartitions: Int,* seqFunc:
>>> JFunction2[U, V, U],
>>>   combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] = {
>>> implicit val ctag: ClassTag[U] = fakeClassTag
>>> fromRDD(rdd.aggregateByKey(zeroValue, *numPartitions)*(seqFunc,
>>> combFunc))
>>>   }
>>> The result is always empty. It looks like there is a problem with the
>>> hashPartitioner created at PairRddFunctions :
>>>  def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions:
>>> Int)(seqOp: (U, V) => U,
>>>   combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
>>> aggregateByKey(zeroValue, *new HashPartitioner(numPartitions)*)(seqOp,
>>> combOp)
>>>   }
>>> vs:
>>>  def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
>>>   combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
>>> aggregateByKey(zeroValue, *defaultPartitioner*(self))(seqOp, combOp)
>>>   }
>>> I can't debug it properly with eclipse, and error occurs when threads
>>> are in spark code (system editor can only open file base resources).
>>>
>>> Does anyone know how to resolve this issue?
>>>
>>> Thanks in advance,
>>> Pedro.
>>>
>>>
>>>
>>>


Re: Is memory-only no-disk Spark possible?

2021-08-21 Thread Jacek Laskowski
Hi Bobby,

What a great summary of what happens behind the scenes! Enjoyed every
sentence!

"The default shuffle implementation will always write out to disk." <--
that's what I wasn't sure about the most. Thanks again!

/me On digging deeper...

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Fri, Aug 20, 2021 at 4:27 PM Bobby Evans  wrote:

> On the data path, Spark will write to a local disk when it runs out of
> memory and needs to spill or when doing a shuffle with the default shuffle
> implementation.  The spilling is a good thing because it lets you process
> data that is too large to fit in memory.  It is not great because the
> processing slows down a lot when that happens, but slow is better than
> crashing in many cases. The default shuffle implementation will
> always write out to disk.  This again is good in that it allows you to
> process more data on a single box than can fit in memory. It is bad when
> the shuffle data could fit in memory, but ends up being written to disk
> anyways.  On Linux the data is being written into the page cache and will
> be flushed to disk in the background when memory is needed or after a set
> amount of time. If your query is fast and is shuffling little data, then it
> is likely that your query is running all in memory.  All of the shuffle
> reads and writes are probably going directly to the page cache and the disk
> is not involved at all. If you really want to you can configure the
> pagecache to not spill to disk until absolutely necessary. That should get
> you really close to pure in-memory processing, so long as you have enough
> free memory on the host to support it.
>
> Bobby
>
>
>
> On Fri, Aug 20, 2021 at 7:57 AM Mich Talebzadeh 
> wrote:
>
>> Well I don't know what having an "in-memory Spark only" is going to
>> achieve. Spark GUI shows the amount of disk usage pretty well. The memory
>> is used exclusively by default first.
>>
>> Spark is no different from a predominantly in-memory application.
>> Effectively it is doing the classical disk based hadoop  map-reduce
>> operation "in memory" to speed up the processing but it is still an
>> application on top of the OS.  So like mose applications, there is a state
>> of Spark, the code running and the OS(s), where disk usage will be needed.
>>
>> This is akin to swap space on OS itself and I quote "Swap space is used when
>> your operating system decides that it needs physical memory for active
>> processes and the amount of available (unused) physical memory is
>> insufficient. When this happens, inactive pages from the physical memory
>> are then moved into the swap space, freeing up that physical memory for
>> other uses"
>>
>>  free
>>   totalusedfree  shared  buff/cache
>>  available
>> Mem:   6565973230116700 1429436 234177234113596
>> 32665372
>> Swap: 104857596  550912   104306684
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 20 Aug 2021 at 12:50, Jacek Laskowski  wrote:
>>
>>> Hi,
>>>
>>> I've been exploring BlockManager and the stores for a while now and am
>>> tempted to say that a memory-only Spark setup would be possible (except
>>> shuffle blocks). Is this correct?
>>>
>>> What about shuffle blocks? Do they have to be stored on disk (in
>>> DiskStore)?
>>>
>>> I think broadcast variables are in-memory first so except on-disk
>>> storage level explicitly used (by Spark devs), there's no reason not to
>>> have Spark in-memory only.
>>>
>>> (I was told that one of the differences between Trino/Presto vs Spark
>>> SQL is that Trino keeps all processing in-memory only and will blow up
>>> while Spark uses disk to avoid OOMEs).
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> 
>>> https://about.me/JacekLaskowski
>>> "The Internals Of" Online Books <https://books.japila.pl/>
>>> Follow me on https://twitter.com/jaceklaskowski
>>>
>>> <https://twitter.com/jaceklaskowski>
>>>
>>


Is memory-only no-disk Spark possible?

2021-08-20 Thread Jacek Laskowski
Hi,

I've been exploring BlockManager and the stores for a while now and am
tempted to say that a memory-only Spark setup would be possible (except
shuffle blocks). Is this correct?

What about shuffle blocks? Do they have to be stored on disk (in DiskStore)?

I think broadcast variables are in-memory first so except on-disk storage
level explicitly used (by Spark devs), there's no reason not to have Spark
in-memory only.

(I was told that one of the differences between Trino/Presto vs Spark SQL
is that Trino keeps all processing in-memory only and will blow up while
Spark uses disk to avoid OOMEs).

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


Re: Java : Testing RDD aggregateByKey

2021-08-19 Thread Jacek Laskowski
Hi Pedro,

No idea what might be causing it. Do you perhaps have some code to
reproduce it locally?

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Tue, Aug 17, 2021 at 4:14 PM Pedro Tuero  wrote:

>
> Context: spark-core_2.12-3.1.1
> Testing with maven and eclipse.
>
> I'm modifying a project and a test stops working as expected.
> The difference is in the parameters passed to the function aggregateByKey
> of JavaPairRDD.
>
> JavaSparkContext is created this way:
> new JavaSparkContext(new SparkConf()
> .setMaster("local[1]")
> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"));
> Then I construct a JavaPairRdd using sparkContext.paralellizePairs and
> call a method which makes an aggregateByKey over the input JavaPairRDD  and
> test that the result is the expected.
>
> When I use JavaPairRDD line 369 (doing .aggregateByKey(zeroValue,
> combiner, merger);
>  def aggregateByKey[U](zeroValue: U, seqFunc: JFunction2[U, V, U],
> combFunc: JFunction2[U, U, U]):
>   JavaPairRDD[K, U] = {
> implicit val ctag: ClassTag[U] = fakeClassTag
> fromRDD(rdd.aggregateByKey(zeroValue)(seqFunc, combFunc))
>   }
> The test works as expected.
> But when I use: JavaPairRDD line 355 (doing .aggregateByKey(zeroValue,
> *partitions*,combiner, merger);)
> def aggregateByKey[U](zeroValue: U, *numPartitions: Int,* seqFunc:
> JFunction2[U, V, U],
>   combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] = {
> implicit val ctag: ClassTag[U] = fakeClassTag
> fromRDD(rdd.aggregateByKey(zeroValue, *numPartitions)*(seqFunc,
> combFunc))
>   }
> The result is always empty. It looks like there is a problem with the
> hashPartitioner created at PairRddFunctions :
>  def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp:
> (U, V) => U,
>   combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
> aggregateByKey(zeroValue, *new HashPartitioner(numPartitions)*)(seqOp,
> combOp)
>   }
> vs:
>  def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
>   combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
> aggregateByKey(zeroValue, *defaultPartitioner*(self))(seqOp, combOp)
>   }
> I can't debug it properly with eclipse, and error occurs when threads are
> in spark code (system editor can only open file base resources).
>
> Does anyone know how to resolve this issue?
>
> Thanks in advance,
> Pedro.
>
>
>
>


Re: [ANNOUNCE] Apache Spark 3.1.2 released

2021-06-02 Thread Jacek Laskowski
Big shout-out to you, Dongjoon! Thank you.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Wed, Jun 2, 2021 at 2:59 AM Dongjoon Hyun 
wrote:

> We are happy to announce the availability of Spark 3.1.2!
>
> Spark 3.1.2 is a maintenance release containing stability fixes. This
> release is based on the branch-3.1 maintenance branch of Spark. We strongly
> recommend all 3.1 users to upgrade to this stable release.
>
> To download Spark 3.1.2, head over to the download page:
> https://spark.apache.org/downloads.html
>
> To view the release notes:
> https://spark.apache.org/releases/spark-release-3-1-2.html
>
> We would like to acknowledge all community members for contributing to this
> release. This release would not have been possible without you.
>
> Dongjoon Hyun
>


Re: Updating spark-env.sh per application

2021-05-09 Thread Jacek Laskowski
Hi,

The easiest (but perhaps not necessarily the most flexible) is simply to
use two different versions of spark-submit script with the env var set to
two different values. Have you tried it yet?

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Sun, May 9, 2021 at 5:32 AM Renu Yadav  wrote:

> Hi Mich,
>
> In spark-env.sh , SPARK_DIST_CLASSPATH is set . I want to override this
> variable during runtime as wanted to exclude one lib class from it.
>
>
>
> On Fri, 7 May, 2021, 6:51 pm Mich Talebzadeh, 
> wrote:
>
>> Hi,
>>
>> Environment variables Re read in when spark-submit kicks off. What
>> exactly you need to refresh at the application level?
>>
>> HTH
>>
>> On Fri, 7 May 2021 at 11:34, Renu Yadav  wrote:
>>
>>>   Hi Team,
>>>
>>> Is it possible to override the variable of spark-env.sh on application
>>> level ?
>>>
>>> Thanks & Regards,
>>> Renu Yadav
>>>
>>>
>>> On Fri, May 7, 2021 at 12:16 PM Renu Yadav  wrote:
>>>
>>>> Hi Team,
>>>>
>>>> Is it possible to override the variable of spark-env.sh on application
>>>> level ?
>>>>
>>>> Thanks & Regards,
>>>> Renu Yadav
>>>>
>>>> --
>>
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>


Re: Spark structured streaming + offset management in kafka + kafka headers

2021-04-04 Thread Jacek Laskowski
Hi,

Just to add it to Gabor's excellent answer that checkpointing and offsets
are infrastructure-related and should not really be in the hands of Spark
devs who should instead focus on the business purpose of the code (not
offsets that are very low-level and not really important).

BTW That's what happens in Kafka Streams too

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Sun, Apr 4, 2021 at 12:28 PM Gabor Somogyi 
wrote:

> There is no way to store offsets in Kafka and restart from the stored
> offset. Structured Streaming stores offset in checkpoint and it restart
> from there without any user code.
>
> Offsets can be stored with a listener but it can be only used for lag
> calculation.
>
> BR,
> G
>
>
> On Sat, 3 Apr 2021, 21:09 Ali Gouta,  wrote:
>
>> Hello,
>>
>> I was reading the spark docs about spark structured streaming, since we
>> are thinking about updating our code base that today uses Dstreams, hence
>> spark streaming. Also, one main reason for this change that we want to
>> realize is that reading headers in kafka messages is only supported in
>> spark structured streaming and not in Dstreams.
>>
>> I was surprised to not see an obvious way to handle manually the offsets
>> by committing the offsets to kafka. In spark streaming we used to do it
>> with something similar to these lines of code:
>>
>> stream.foreachRDD { rdd =>
>>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>
>>   // some time later, after outputs have completed
>>   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}
>>
>>
>> And this works perfectly ! Especially, this works very nice in case of
>> job failure/restart... I am wondering how this can be achieved in spark
>> structured streaming ?
>>
>> I read about checkpoints, and this reminds me the old way of doing things
>> in spark 1.5/kafka0.8 and is not perfect since we are not deciding when to
>> commit offsets by ourselves.
>>
>> Did I miss anything ? What would be the best way of committing offsets to
>> kafka with spark structured streaming to the concerned consumer group ?
>>
>> Best regards,
>> Ali Gouta.
>>
>


Re: Writing to Google Cloud Storage with v2 algorithm safe?

2021-04-04 Thread Jacek Laskowski
Hi Vaquar,

Thanks a lot! Accepted as the answer (yet there was the other answer that
was very helpful too). Tons of reading ahead to understand it more.

That once again makes me feel that Hadoop MapReduce experience would help a
great deal (and I've got none).

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Sun, Apr 4, 2021 at 7:28 AM vaquar khan  wrote:

> Hi Jecek ,
>
> I have answered , hope you find it useful.
>
> Regards,
> Viquar khan
>
> On Sat, Apr 3, 2021 at 11:19 AM Jacek Laskowski  wrote:
>
>> Hi,
>>
>> I've just posted a question on StackOverflow [1] about the safety of the
>> v2 algorithm while writing out to Google Cloud Storage. I think I'm missing
>> some fundamentals on how cloud object stores work (GCS in particular) and
>> hence the question.
>>
>> Is this all about File.rename and how many HTTP calls are there under the
>> covers? How to know it for GCS?
>>
>> Thank you for any help you can provide. Merci beaucoup mes amis :)
>>
>> [1] https://stackoverflow.com/q/66933229/1305344
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://about.me/JacekLaskowski
>> "The Internals Of" Online Books <https://books.japila.pl/>
>> Follow me on https://twitter.com/jaceklaskowski
>>
>> <https://twitter.com/jaceklaskowski>
>>
>
>
> --
> Regards,
> Vaquar Khan
> +1 -224-436-0783
> Greater Chicago
>


Re: Source.getBatch and schema vs qe.analyzed.schema?

2021-04-03 Thread Jacek Laskowski
Hi Bartosz,

This is not a question about whether the data source supports fixed or
user-defined schema but what schema to use when requested for a streaming
batch in Source.getBatch.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Wed, Mar 31, 2021 at 7:44 PM Bartosz Konieczny 
wrote:

> Hi Jacek,
>
> An interesting question! I don't know the exact answer and will be happy
> to learn by the way :) Below you can find my understanding for these 2
> things, hoping it helps a little.
>
> For me, we can distinguish 2 different source categories. The first of
> them is a source with some fixed schema. A good example is Apache Kafka
> which exposes the topic name, key, value and you can't change that; it's
> always the same, whenever you run the reader in Company A or in Company B.
> What changes is the data extraction logic from the key, value or headers.
> But it's business-specific, not data store-specific. You can find the
> schema implementation here: Kafka
> <https://github.com/apache/spark/blob/v3.1.1/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala#L122>
>
> The second type is a source with user-defined schema, like a RDBMS table
> or a NoSQL schemaless store. Here, predicting the schema will not only be
> business-specific, but also data store-specific; you can set any name for a
> Primary Key column, there is no such rule like "key" or "value" in Kafka.
> To avoid runtime errors (= favor fail-fast approach before the data is
> read), Spark can use the metadata to assert (analyze) the schema specified
> by the user to confirm it or fail fast before reading the data.
>
> Best,
> Bartosz.
>
>
>
> On Mon, Mar 29, 2021 at 1:07 PM Jacek Laskowski  wrote:
>
>> Hi,
>>
>> I've been developing a data source with a source and sink for Spark
>> Structured Streaming.
>>
>> I've got a question about Source.getBatch [1]:
>>
>> def getBatch(start: Option[Offset], end: Offset): DataFrame
>>
>> getBatch returns a streaming DataFrame between the offsets so the idiom
>> (?) is to have a code as follows:
>>
>> val relation = new MyRelation(...)(sparkSession)
>> val plan = LogicalRelation(relation, isStreaming = true)
>> new Dataset[Row](sparkSession, plan, RowEncoder(schema))
>>
>> Note the use of schema [2] that is another part of the Source abstraction:
>>
>> def schema: StructType
>>
>> This is the "source" of my question. Is the above OK in a streaming sink
>> / Source.getBatch?
>>
>> Since there are no interim operators that could change attributes
>> (schema) I think it's OK.
>>
>> I've seen the following code and that made me wonder whether it's better
>> or not compared to the solution above:
>>
>> val relation = new MyRelation(...)(sparkSession)
>> val plan = LogicalRelation(relation, isStreaming = true)
>>
>> // When would we have to execute plan?
>> val qe = sparkSession.sessionState.executePlan(plan)
>> new Dataset[Row](sparkSession, plan, RowEncoder(qe.analyzed.schema))
>>
>> When would or do we have to use qe.analyzed.schema vs simply schema?
>> Could this qe.analyzed.schema help avoid some edge cases and is a preferred
>> approach?
>>
>> Thank you for any help you can offer. Much appreciated.
>>
>> [1]
>> https://github.com/apache/spark/blob/053dd858d38e6107bc71e0aa3a4954291b74f8c8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala#L61
>> [2]
>> https://github.com/apache/spark/blob/053dd858d38e6107bc71e0aa3a4954291b74f8c8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala#L35
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://about.me/JacekLaskowski
>> "The Internals Of" Online Books <https://books.japila.pl/>
>> Follow me on https://twitter.com/jaceklaskowski
>>
>> <https://twitter.com/jaceklaskowski>
>>
>
>
> --
> Bartosz Konieczny
> data engineer
> https://www.waitingforcode.com
> https://github.com/bartosz25/
> https://twitter.com/waitingforcode
>
>


Re: Writing to Google Cloud Storage with v2 algorithm safe?

2021-04-03 Thread Jacek Laskowski
Hi,

From
https://spark.apache.org/docs/3.1.1/cloud-integration.html#recommended-settings-for-writing-to-object-stores
:

> For object stores whose consistency model means that rename-based commits
are safe use the FileOutputCommitter v2 algorithm for performance; v1 for
safety.

These are "safe" and "safety" meanings.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Sat, Apr 3, 2021 at 7:49 PM Mich Talebzadeh 
wrote:

> Hi Jacek,
>
> Can you please clarify your question?
>
> with regard to your point:
>
> "... I think I'm missing some fundamentals on how cloud object stores work
> (GCS in particular) and hence the question."
>
> The end result is the safe storage of data in object storage in GCP right?
>
> HTH
>
>
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 3 Apr 2021 at 17:13, Jacek Laskowski  wrote:
>
>> Hi,
>>
>> I've just posted a question on StackOverflow [1] about the safety of the
>> v2 algorithm while writing out to Google Cloud Storage. I think I'm missing
>> some fundamentals on how cloud object stores work (GCS in particular) and
>> hence the question.
>>
>> Is this all about File.rename and how many HTTP calls are there under the
>> covers? How to know it for GCS?
>>
>> Thank you for any help you can provide. Merci beaucoup mes amis :)
>>
>> [1] https://stackoverflow.com/q/66933229/1305344
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://about.me/JacekLaskowski
>> "The Internals Of" Online Books <https://books.japila.pl/>
>> Follow me on https://twitter.com/jaceklaskowski
>>
>> <https://twitter.com/jaceklaskowski>
>>
>


Writing to Google Cloud Storage with v2 algorithm safe?

2021-04-03 Thread Jacek Laskowski
Hi,

I've just posted a question on StackOverflow [1] about the safety of the v2
algorithm while writing out to Google Cloud Storage. I think I'm missing
some fundamentals on how cloud object stores work (GCS in particular) and
hence the question.

Is this all about File.rename and how many HTTP calls are there under the
covers? How to know it for GCS?

Thank you for any help you can provide. Merci beaucoup mes amis :)

[1] https://stackoverflow.com/q/66933229/1305344

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


Source.getBatch and schema vs qe.analyzed.schema?

2021-03-29 Thread Jacek Laskowski
Hi,

I've been developing a data source with a source and sink for Spark
Structured Streaming.

I've got a question about Source.getBatch [1]:

def getBatch(start: Option[Offset], end: Offset): DataFrame

getBatch returns a streaming DataFrame between the offsets so the idiom (?)
is to have a code as follows:

val relation = new MyRelation(...)(sparkSession)
val plan = LogicalRelation(relation, isStreaming = true)
new Dataset[Row](sparkSession, plan, RowEncoder(schema))

Note the use of schema [2] that is another part of the Source abstraction:

def schema: StructType

This is the "source" of my question. Is the above OK in a streaming sink /
Source.getBatch?

Since there are no interim operators that could change attributes (schema)
I think it's OK.

I've seen the following code and that made me wonder whether it's better or
not compared to the solution above:

val relation = new MyRelation(...)(sparkSession)
val plan = LogicalRelation(relation, isStreaming = true)

// When would we have to execute plan?
val qe = sparkSession.sessionState.executePlan(plan)
new Dataset[Row](sparkSession, plan, RowEncoder(qe.analyzed.schema))

When would or do we have to use qe.analyzed.schema vs simply schema? Could
this qe.analyzed.schema help avoid some edge cases and is a preferred
approach?

Thank you for any help you can offer. Much appreciated.

[1]
https://github.com/apache/spark/blob/053dd858d38e6107bc71e0aa3a4954291b74f8c8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala#L61
[2]
https://github.com/apache/spark/blob/053dd858d38e6107bc71e0aa3a4954291b74f8c8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala#L35

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


Re: [k8s] PersistentVolumeClaim support in 3.1.1 on minikube

2021-03-15 Thread Jacek Laskowski
Hi,

I think I found it. I should be using OnDemand claim name so it gets
replaced to be unique per executor (?)

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Mon, Mar 15, 2021 at 8:36 PM Jacek Laskowski  wrote:

> Hi,
>
> I've been toying with persistent volumes using Spark 3.1.1 on minikube and
> am wondering whether it's a supported platform. I'd not be surprised if not
> given all the surprises I've been experiencing lately.
>
> Can I use spark-shell or any Spark app in client mode with PVCs with the
> default 2 executors? Should the following work if I removed --num-executors
> 1?
>
> ./bin/spark-shell \
>   --master k8s://$K8S_SERVER \
>   --num-executors 1 \
>   --conf
> spark.kubernetes.executor.volumes.persistentVolumeClaim.$VOLUME_NAME.mount.path=$MOUNT_PATH
> \
>   --conf
> spark.kubernetes.executor.volumes.persistentVolumeClaim.$VOLUME_NAME.options.claimName=$PVC_CLAIM_NAME
> \
>   --conf
> spark.kubernetes.executor.volumes.persistentVolumeClaim.$VOLUME_NAME.options.storageClass=$PVC_STORAGE_CLASS
> \
>   --conf
> spark.kubernetes.executor.volumes.persistentVolumeClaim.$VOLUME_NAME.options.sizeLimit=$PVC_SIZE_LIMIT
> \
>   --conf spark.kubernetes.container.image=$IMAGE_NAME \
>   --conf spark.kubernetes.context=minikube \
>   --conf spark.kubernetes.namespace=spark-demo \
>   --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
>   --verbose
>
> The whole demo is available at
> https://jaceklaskowski.github.io/spark-kubernetes-book/demo/persistentvolumeclaims/
>
> Please help. Thank you!
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://about.me/JacekLaskowski
> "The Internals Of" Online Books <https://books.japila.pl/>
> Follow me on https://twitter.com/jaceklaskowski
>
> <https://twitter.com/jaceklaskowski>
>


[k8s] PersistentVolumeClaim support in 3.1.1 on minikube

2021-03-15 Thread Jacek Laskowski
Hi,

I've been toying with persistent volumes using Spark 3.1.1 on minikube and
am wondering whether it's a supported platform. I'd not be surprised if not
given all the surprises I've been experiencing lately.

Can I use spark-shell or any Spark app in client mode with PVCs with the
default 2 executors? Should the following work if I removed --num-executors
1?

./bin/spark-shell \
  --master k8s://$K8S_SERVER \
  --num-executors 1 \
  --conf
spark.kubernetes.executor.volumes.persistentVolumeClaim.$VOLUME_NAME.mount.path=$MOUNT_PATH
\
  --conf
spark.kubernetes.executor.volumes.persistentVolumeClaim.$VOLUME_NAME.options.claimName=$PVC_CLAIM_NAME
\
  --conf
spark.kubernetes.executor.volumes.persistentVolumeClaim.$VOLUME_NAME.options.storageClass=$PVC_STORAGE_CLASS
\
  --conf
spark.kubernetes.executor.volumes.persistentVolumeClaim.$VOLUME_NAME.options.sizeLimit=$PVC_SIZE_LIMIT
\
  --conf spark.kubernetes.container.image=$IMAGE_NAME \
  --conf spark.kubernetes.context=minikube \
  --conf spark.kubernetes.namespace=spark-demo \
  --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
  --verbose

The whole demo is available at
https://jaceklaskowski.github.io/spark-kubernetes-book/demo/persistentvolumeclaims/

Please help. Thank you!

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


Re: Spark 3.0.1 | Volume to use For Spark Kubernetes Executor Part Files Storage

2021-03-08 Thread Jacek Laskowski
Hi,

On GCP I'd go for buckets in Google Storage. Not sure how reliable it is in
production deployments though. Only demo experience here.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Mon, Mar 8, 2021 at 12:33 PM Ranju Jain  wrote:

> Hi Jacek,
>
>
>
> I am using this property
> spark.kubernetes.executor.deleteOnTermination=true only to troubleshoot
> else I am freeing up resources after executors complete their job.
>
> Now I want to use some Shared storage which can be shared by all executors
> to write the part files.
>
> Which Kubernetes Storage I should go for?
>
>
>
> Regards
>
> Ranju
>
> *From:* Jacek Laskowski 
> *Sent:* Monday, March 8, 2021 4:14 PM
> *To:* Ranju Jain 
> *Cc:* Attila Zsolt Piros ;
> user@spark.apache.org
> *Subject:* Re: Spark 3.0.1 | Volume to use For Spark Kubernetes Executor
> Part Files Storage
>
>
>
> Hi,
>
>
>
> > as Executors terminates after their work completes.
>
>
>
> --conf spark.kubernetes.executor.deleteOnTermination=false ?
>
>
> Pozdrawiam,
>
> Jacek Laskowski
>
> 
>
> https://about.me/JacekLaskowski
>
> "The Internals Of" Online Books
> <https://protect2.fireeye.com/v1/url?k=901b36bb-cf800fbe-901b7620-86d2114eab2f-836d4bb779fe8f92=1=d3b471ef-b3ce-4cfd-9bcd-f42590d0f10b=https%3A%2F%2Fbooks.japila.pl%2F>
>
> Follow me on https://twitter.com/jaceklaskowski
>
>
> <https://twitter.com/jaceklaskowski>
>
>
>
>
>
> On Sun, Mar 7, 2021 at 5:23 PM Ranju Jain 
> wrote:
>
> Hi,
>
>
>
> I need to save the Executors processed data in the form of part files ,
> but I think persistent Volume is not an option for this as Executors
> terminates after their work completes.
>
> So I am thinking to use shared volume across executor pods.
>
>
>
> Should I go with NFS or is there any other Volume option as well to
> explore?
>
>
>
> Regards
>
> Ranju
>
>


Re: Spark 3.0.1 | Volume to use For Spark Kubernetes Executor Part Files Storage

2021-03-08 Thread Jacek Laskowski
Hi,

> as Executors terminates after their work completes.

--conf spark.kubernetes.executor.deleteOnTermination=false ?

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Sun, Mar 7, 2021 at 5:23 PM Ranju Jain 
wrote:

> Hi,
>
>
>
> I need to save the Executors processed data in the form of part files ,
> but I think persistent Volume is not an option for this as Executors
> terminates after their work completes.
>
> So I am thinking to use shared volume across executor pods.
>
>
>
> Should I go with NFS or is there any other Volume option as well to
> explore?
>
>
>
> Regards
>
> Ranju
>


Re: Spark Kubernetes 3.0.1 | podcreationTimeout not working

2021-02-10 Thread Jacek Laskowski
Hi Ranju,

Can you show the pods and their state? Does the situation happen at the
very beginning of spark-submit or some time later (once you've got a couple
of executors)? My understanding allows me to think of the driver not
starting up due to lack of resources or executors. In either case they're
not deleted as they simply wait forever. I might be mistaken here though.

What property is this for "this timeout of 60 sec."?

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Wed, Feb 10, 2021 at 2:03 PM Ranju Jain 
wrote:

> Hi,
>
>
>
> I submitted the spark job and pods goes in Pending state because of
> insufficient resources.
>
> But they are not getting deleted after this timeout of 60 sec. Please help
> me in understanding.
>
>
>
> Regards
>
> Ranju
>


Re: Spark structured streaming - efficient way to do lots of aggregations on the same input files

2021-01-22 Thread Jacek Laskowski
Hi Filip,

Care to share the code behind "The only thing I found so far involves using
forEachBatch and manually updating my aggregates. "?

I'm not completely sure I understand your use case and hope the code could
shed more light on it. Thank you.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Thu, Jan 21, 2021 at 5:05 PM Filip 
wrote:

> Hi,
>
> I'm considering using Apache Spark for the development of an application.
> This would replace a legacy program which reads CSV files and does lots
> (tens/hundreds) of aggregations on them. The aggregations are fairly
> simple:
> counts, sums, etc. while applying some filtering conditions on some of the
> columns.
>
> I prefer using structured streaming for its simplicity and low-latency. I'd
> also like to use full SQL queries (via createOrReplaceTempView). However,
> doing multiple queries means Spark will re-read the input files for each
> one
> of them. This seems very inefficient for my use-case.
>
> Does anyone have any suggestions? The only thing I found so far involves
> using forEachBatch and manually updating my aggregates. But, I think there
> should be a simpler solution for this use case.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Column-level encryption in Spark SQL

2021-01-21 Thread Jacek Laskowski
Hi,

Never heard of it (and have once been tasked to explore a similar use
case). I'm curious how you'd like it to work? (no idea how Hive does this
either)

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Sat, Dec 19, 2020 at 2:38 AM john washington 
wrote:

> Dear Spark team members,
>
> Can you please advise if Column-level encryption is available in Spark SQL?
> I am aware that HIVE supports column level encryption.
>
> Appreciate your response.
>
> Thanks,
> John
>


Re: Process each kafka record for structured streaming

2021-01-21 Thread Jacek Laskowski
Hi,

Can you use console sink and make sure that the pipeline shows some
progress?

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Wed, Jan 20, 2021 at 10:44 AM rajat kumar 
wrote:

> Hi,
>
> I want to apply custom logic for each row of data I am getting through
> kafka and want to do it with microbatch.
> When I am running it , it is not progressing.
>
>
> kafka_stream_df \
> .writeStream \
> .foreach(process_records) \
> .outputMode("append") \
> .option("checkpointLocation", "checkpt") \
> .trigger(continuous="5 seconds").start()
>
> Regards
>
> Rajat
>
>
>


Re: Application Timeout

2021-01-21 Thread Jacek Laskowski
Hi Brett,

No idea why it happens, but got curious about this "Cores" column being 0.
Is this always the case?

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Tue, Jan 19, 2021 at 11:27 PM Brett Spark  wrote:

> Hello!
> When using Spark Standalone & Spark 2.4.4 / 3.0.0 - we are seeing our
> standalone Spark "applications" timeout and show as "Finished" after around
> an hour of time.
>
> Here is a screenshot from the Spark master before it's marked as finished.
> [image: image.png]
> Here is a screenshot from the Spark master after it's marked as finished.
> (After over an hour of idle time).
> [image: image.png]
> Here are the logs from the Spark Master / Worker:
>
> spark-master-2d733568b2a7e82de7b2b09b6daa17e9-7cd4cfcddb-f84q7 master
> 2021-01-19 21:55:47,282 INFO master.Master: 172.32.3.66:34570 got
> disassociated, removing it.
> spark-master-2d733568b2a7e82de7b2b09b6daa17e9-7cd4cfcddb-f84q7 master
> 2021-01-19 21:55:52,095 INFO master.Master: 172.32.115.115:36556 got
> disassociated, removing it.
> spark-master-2d733568b2a7e82de7b2b09b6daa17e9-7cd4cfcddb-f84q7 master
> 2021-01-19 21:55:52,095 INFO master.Master: 172.32.115.115:37305 got
> disassociated, removing it.
> spark-master-2d733568b2a7e82de7b2b09b6daa17e9-7cd4cfcddb-f84q7 master
> 2021-01-19 21:55:52,096 INFO master.Master: Removing app
> app-20210119204911-
> spark-worker-2d733568b2a7e82de7b2b09b6daa17e9-7bbb75f9b6-8mv2b worker
> 2021-01-19 21:55:52,112 INFO shuffle.ExternalShuffleBlockResolver:
> Application app-20210119204911- removed, cleanupLocalDirs = true
>
> Is there a setting that causes an application to timeout after an hour of
> a Spark application or Spark worker being idle?
>
> I would like to keep our Spark applications alive as long as possible.
>
> I haven't been able to find a setting in the Spark confs documentation
> that corresponds to this so i'm wondering if this is something that's hard
> coded.
>
> Please let me know,
> Thank you!
>


Re: Only one Active task in Spark Structured Streaming application

2021-01-21 Thread Jacek Laskowski
Hi,

I'd look at stages and jobs as it's possible that the only task running is
the missing one in a stage of a job. Just guessing...

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Thu, Jan 21, 2021 at 12:19 PM Eric Beabes 
wrote:

> Hello,
>
> My Spark Structured Streaming application was performing well for quite
> some time but all of a sudden from today it has slowed down. I noticed in
> the Spark UI that the 'No. of Active Tasks' is 1 even though 64 Cores are
> available. (Please see the attached image).
>
> I don't believe there's any data skew issue related to partitioning of
> data. What could be the reason for this? Please advise. Thanks.
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Query on entrypoint.sh Kubernetes spark

2021-01-21 Thread Jacek Laskowski
Hi,

I'm a beginner in Spark on Kubernetes so bear with me and watch out for
possible mistakes :)

The key to understand entrypoint.sh and -deploy-mode client is to think
about the environment where the script is executed in. That's k8s already
where the Docker image is brought to life as a container of a driver pod.
There's no point using cluster deploy mode...ever. Makes sense?

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Thu, Jan 21, 2021 at 10:40 AM Sachit Murarka 
wrote:

> Hi All,
>
> To run spark on kubernetes . I see following lines in entrypoint.sh script
> available
>
> case "$1" in
>   driver)
> shift 1
> CMD=(
>   "$SPARK_HOME/bin/spark-submit"
>   --conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS"
>   --deploy-mode client
>
> Could you pls suggest Why deploy-mode client is mentioned in entrypoint.sh
> ?
> I am running spark submit using deploy mode cluster but inside
> entrypoint.sh which it is mentioned like that.
>
>
> Kind Regards,
> Sachit Murarka
>


Re: RDD filter in for loop gave strange results

2021-01-20 Thread Jacek Laskowski
Hi Marco,

A Scala dev here.

In short: yet another reason against Python :)

Honestly, I've got no idea why the code gives the output. Ran it with
3.1.1-rc1 and got the very same results. Hoping pyspark/python devs will
chime in and shed more light on this.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Wed, Jan 20, 2021 at 2:07 PM Marco Wong  wrote:

> Dear Spark users,
>
> I ran the Python code below on a simple RDD, but it gave strange results.
> The filtered RDD contains non-existent elements which were filtered away
> earlier. Any idea why this happened?
> ```
> rdd = spark.sparkContext.parallelize([0,1,2])
> for i in range(3):
> print("RDD is ", rdd.collect())
> print("Filtered RDD is ", rdd.filter(lambda x:x!=i).collect())
> rdd = rdd.filter(lambda x:x!=i)
> print("Result is ", rdd.collect())
> print()
> ```
> which gave
> ```
> RDD is  [0, 1, 2]
> Filtered RDD is  [1, 2]
> Result is  [1, 2]
>
> RDD is  [1, 2]
> Filtered RDD is  [0, 2]
> Result is  [0, 2]
>
> RDD is  [0, 2]
> Filtered RDD is  [0, 1]
> Result is  [0, 1]
> ```
>
> Thanks,
>
> Marco
>


Re: Spark RDD + HBase: adoption trend

2021-01-20 Thread Jacek Laskowski
Hi Marco,

IMHO RDD is only for very sophisticated use cases that very few Spark devs
would be capable of. I consider RDD API a sort of Spark assembler and most
Spark devs should stick to Dataset API.

Speaking of HBase, see
https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/bigtable/spark
where you can find a demo that I worked on last year and made sure that:

"Apache HBase™ Spark Connector implements the DataSource API for Apache
HBase and allows executing relational queries on data stored in Cloud
Bigtable."

That makes hbase-rdd even more obsolete but not necessarily unusable (I am
little skilled in the HBase space to comment on this).

I think you should consider merging the project hbase-rdd of yours with the
official Apache HBase™ Spark Connector at
https://github.com/apache/hbase-connectors/tree/master/spark (as they seem
to lack active development IMHO).

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Wed, Jan 20, 2021 at 2:44 PM Marco Firrincieli 
wrote:

> Hi, my name is Marco and I'm one of the developers behind
> https://github.com/unicredit/hbase-rdd
> a project we are currently reviewing for various reasons.
>
> We were basically wondering if RDD "is still a thing" nowadays (we see
> lots of usage for DataFrames or Datasets) and we're not sure how much of
> the community still works/uses RDDs.
>
> Also, for lack of time, we always mainly worked using Cloudera-flavored
> Hadoop/HBase & Spark versions. We were thinking the community would then
> help us organize the project in a more "generic" way, but that didn't
> happen.
>
> So I figured I would ask here what is the gut feeling of the Spark
> community so to better define the future of our little library.
>
> Thanks
>
> -Marco
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Event Log Forwarding and Offset Tracking

2021-01-17 Thread Jacek Laskowski
Hi,

> Forwarding Spark Event Logs to identify critical events like job start,
executor failures, job failures etc to ElasticSearch via log4j. However I
could not find any way to foward event log via log4j configurations. Is
there any other recommended approach to track these application events?

I'd use SparkListener API (
http://spark.apache.org/docs/latest/api/scala/org/apache/spark/scheduler/SparkListener.html
)

> 2 - For Spark streaming jobs, is there any way to identify that data from
Kafka is not consumed for whatever reason, or the offsets are not
progressing as expected and also forward that to ElasticSearch via log4j
for monitoring

Think SparkListener API would help here too.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Wed, Jan 13, 2021 at 5:15 PM raymond.tan 
wrote:

> Hello here, I am new to spark and am trying to add some monitoring for
> spark applications specifically to handle the below situations - 1 -
> Forwarding Spark Event Logs to identify critical events like job start,
> executor failures, job failures etc to ElasticSearch via log4j. However I
> could not find any way to foward event log via log4j configurations. Is
> there any other recommended approach to track these application events? 2 -
> For Spark streaming jobs, is there any way to identify that data from Kafka
> is not consumed for whatever reason, or the offsets are not progressing as
> expected and also forward that to ElasticSearch via log4j for monitoring
> Thanks, Raymond
> --
> Sent from the Apache Spark User List mailing list archive
> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>


Re: understanding spark shuffle file re-use better

2021-01-17 Thread Jacek Laskowski
Hi,

An interesting question that I must admit I'm not sure how to answer myself
actually :)

Off the top of my head, I'd **guess** unless you cache the first query
these two queries would share nothing. With caching, there's a phase in
query execution when a canonicalized version of a query is used to look up
any cached queries.

Again, I'm not really sure and if I'd have to answer it (e.g. as part of an
interview) I'd say nothing would be shared / re-used.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Wed, Jan 13, 2021 at 5:39 PM Koert Kuipers  wrote:

> is shuffle file re-use based on identity or equality of the dataframe?
>
> for example if run the exact same code twice to load data and do
> transforms (joins, aggregations, etc.) but without re-using any actual
> dataframes, will i still see skipped stages thanks to shuffle file re-use?
>
> thanks!
> koert
>


Re: Dynamic Spark metrics creation

2021-01-17 Thread Jacek Laskowski
Hey Yurii,

> which is unavailable from executors.

Register it on the driver and use accumulators on executors to update the
values (on the driver)?

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


‪On Sat, Jan 16, 2021 at 2:21 PM ‫Yuri Oleynikov (יורי אולייניקוב‬‎ <
yur...@gmail.com> wrote:‬

> Hi all,
> I have a spark application with Arbitrary Stateful Aggregation implemented
> with FlatMapGroupsWithStateFunction.
>
> I want to make some statistics about incoming events inside
> FlatMapGroupsWithStateFunction.
> The statistics are made from some event property which on the one hand has
> dynamic values but on the other hand - small finite set (thought unknown)
> of values (e.g. country name).
>
> So I thought to register dynamic metrics inside
> FlatMapGroupsWithStateFunction but as far as I understand, this requires
> accessing MetricsSystem via SparkEnv.get() which is unavailable from
> executors.
>
> Any thoughts/suggestions?
>
> With best regards,
> Yurii
>
>


Re: Insertable records in Datasource v2.

2021-01-14 Thread Jacek Laskowski
Hi Rahul,

I think it's not and will not be supported. You should report an issue in
JIRA at https://issues.apache.org/jira/projects/SPARK, but don't expect a
solution with temporary views though.

I did manage to reproduce the AnalysisException: unresolved operator and
even though INSERT INTO views is not allowed (according to
ResolveRelations) that did not get triggered.

Just like you I thought I could get past this exception
with InsertableRelation, but alas it didn't work either.

The temporary view based on the custom data source is indeed resolved, but
InsertIntoTable logical operator did not that leads to the exception you've
been facing:

unresolved operator 'InsertIntoTable RelationV2 custom[id#0L, name#1, v#2]
(Options: [paths=[]]), false, false;;
'InsertIntoTable RelationV2 custom[id#0L, name#1, v#2] (Options:
[paths=[]]), false, false
+- Project [_1#56 AS id#60, _2#57 AS name#61, _3#58 AS v#62]
   +- LocalRelation [_1#56, _2#57, _3#58]

By the way, this insert could also be expressed using Scala API as follows:

Seq((2, "insert_record1", 200), (20001, "insert_record2", 201))
  .toDF(insertDFWithSchema.schema.names: _*)
  .write
  .insertInto(sqlView)

In summary, you should report this to JIRA, but don't expect this get fixed
other than to catch this case just to throw this exception
from ResolveRelations: Inserting into a view is not allowed"

Unless I'm mistaken...

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Thu, Jan 14, 2021 at 3:52 AM Rahul Kumar  wrote:

> I'm implementing V2 datasource for a custom datasource.
>
> I'm trying to insert a record into a temp view, in following fashion.
>
> insertDFWithSchema.createOrReplaceTempView(sqlView)
> spark.sql(s”insert into $sqlView  values (2, ‘insert_record1’, 200,
> 23000), (20001, ‘insert_record2’, 201, 23001)“). where insertDFWithSchema
> is
> some dataframe loaded from custom data source.
>
>
> I end up getting following exception
>
> *org.apache.spark.sql.AnalysisException: unresolved operator
> 'InsertIntoTable RelationV2* mydb[id#63, name#64, age#65, salary#66]
> (Options:
>
> [mydb.updateByKey=id,mydb.namespace=test,paths=[],mydb.set=input_data,mydb.se...),
> false, false;;
> 'InsertIntoTable RelationV2 mydb[id#63, name#64, age#65, salary#66]
> (Options:
>
> [mydb.updateByKey=id,mydb.namespace=test,paths=[],mydb.set=input_data,mydb.se...),
> false, false
> +- LocalRelation [col1#88, col2#89, col3#90, col4#91]
>   at
>
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:43)
>   at
>
> org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:95)
>   at
>
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$3.apply(CheckAnalysis.scala:431)
>   at
>
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$3.apply(CheckAnalysis.scala:430)
>   at
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
>   at
>
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:430)
>   at
>
> org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:95)
>   at
>
> org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:108)
>   at
>
> org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:105)
>   at
>
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
>   at
>
> org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
>   at
>
> org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:58)
>   at
>
> org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:56)
>   at
>
> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:48)
>   at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:78)
>   at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:643)
>   ... 40 elided
>
>
> In V1 datasource implementation, I had insertable trait in BaseRelation. In
> v2, I'm not sure how it could be achieved. I have also tried implementing
> insertable trait in DefaultSource.   Any input would be extremely helpful.
>
> Thanks,
> Rahul
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Data source v2 streaming sinks does not support Update mode

2021-01-12 Thread Jacek Laskowski
Hi,

Can you post the whole message? I'm trying to find what might be causing
it. A small reproducible example would be of help too. Thank you.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes 
wrote:

> Trying to port my Spark 2.4 based (Structured) streaming application to
> Spark 3.0. I compiled it using the dependency given below:
>
> 
> org.apache.spark
> spark-sql-kafka-0-10_${scala.binary.version}
> 3.1.0
> 
>
>
> Every time I run it under Spark 3.0, I get this message: *Data source v2
> streaming sinks does not support Update mode*
>
> I am using '*mapGroupsWithState*' so as per this link (
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes),
> the only supported Output mode is "*Update*".
>
> My Sink is a Kafka topic so I am using this:
>
> .writeStream
> .format("kafka")
>
>
> What am I missing?
>
>
>


Re: Converting spark batch to spark streaming

2021-01-08 Thread Jacek Laskowski
Hi,

Start with DataStreamWriter.foreachBatch.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Thu, Jan 7, 2021 at 6:55 PM mhd wrk  wrote:

> I'm trying to convert a spark batch application to a streaming application
> and wondering what function (or design pattern) I should use to execute a
> series of operations inside the driver upon arrival of each message (a text
> file inside an HDFS folder) before starting computation inside executors.
>
> Thanks,
> Mohammad
>


Re: Impact of .localCheckpoint() and executor dying

2021-01-06 Thread Jacek Laskowski
Hi,

> impact of an executor dying after a localCheckpoint is taken.

My memory is a bit vague on this, but I'd not be surprised if this
localCheckpoint-ed RDD would be "broken" and any actions would simply throw
an exception like missing partitions or similar. There's no way back.

I wish myself that someone with more skills in this area chimed in...

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Wed, Jan 6, 2021 at 8:30 PM Brett Larson 
wrote:

> Jacek,
> Thanks for your response, I am still trying to understand the impact of an
> executor dying after a localCheckpoint is taken.
>
> Would the entire spark application fail in this case due to the broken
> lineage? Or would the jobs associated with that executor need to be
> re-computed from scratch?
>
> Thank you!
>
>
> On Wed, Jan 6, 2021 at 1:09 PM Jacek Laskowski  wrote:
>
>> Hi,
>>
>> > My understanding is that .localCheckpoint() breaks the lineage of the
>> RDD
>>
>> True.
>>
>> > and this requires that the entire RDD to be rebuild instead of being
>> able to recompute lost partitions.
>>
>> In a sense, it's as if you saved the partitions to executors and re-read
>> them back as source data (for this checkpointed RDD).
>>
>> > Does each executor store a copy of the entire RDD?
>>
>> No. An executor has got only the data of the partitions (for the tasks
>> this executor has executed).
>>
>> > Checkpoint over .localCheckpoint.
>>
>> checkpoint is similar to localCheckpoint, but slower and reliable (as
>> it's on a stable HDFS file system not on an ephemeral executor). In either
>> case, the lineage should be the same = cut.
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://about.me/JacekLaskowski
>> "The Internals Of" Online Books <https://books.japila.pl/>
>> Follow me on https://twitter.com/jaceklaskowski
>>
>> <https://twitter.com/jaceklaskowski>
>>
>>
>> On Wed, Jan 6, 2021 at 6:15 PM brettplarson 
>> wrote:
>>
>>> Hello,
>>> I am wondering what the impact of using .localCheckpoint() and having the
>>> executor die would be?
>>>
>>> My understanding is that .localCheckpoint() breaks the lineage of the RDD
>>> and this requires that the entire RDD to be rebuild instead of being
>>> able to
>>> recompute lost partitions.
>>>
>>> Does each executor store a copy of the entire RDD?
>>>
>>> It's unclear to me the benefit of using Checkpoint over
>>> .localCheckpoint. (I
>>> am aware that this is HDFS backed, but it's unclear the implications of
>>> this)
>>>
>>> Please let me know,
>>> Thank you!
>>>
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>
> --
> *Brett Larson *
> brettpatricklar...@gmail.com / 847321200
>


Re: Impact of .localCheckpoint() and executor dying

2021-01-06 Thread Jacek Laskowski
Hi,

> My understanding is that .localCheckpoint() breaks the lineage of the RDD

True.

> and this requires that the entire RDD to be rebuild instead of being able
to recompute lost partitions.

In a sense, it's as if you saved the partitions to executors and re-read
them back as source data (for this checkpointed RDD).

> Does each executor store a copy of the entire RDD?

No. An executor has got only the data of the partitions (for the tasks this
executor has executed).

> Checkpoint over .localCheckpoint.

checkpoint is similar to localCheckpoint, but slower and reliable (as it's
on a stable HDFS file system not on an ephemeral executor). In either case,
the lineage should be the same = cut.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Wed, Jan 6, 2021 at 6:15 PM brettplarson 
wrote:

> Hello,
> I am wondering what the impact of using .localCheckpoint() and having the
> executor die would be?
>
> My understanding is that .localCheckpoint() breaks the lineage of the RDD
> and this requires that the entire RDD to be rebuild instead of being able
> to
> recompute lost partitions.
>
> Does each executor store a copy of the entire RDD?
>
> It's unclear to me the benefit of using Checkpoint over .localCheckpoint.
> (I
> am aware that this is HDFS backed, but it's unclear the implications of
> this)
>
> Please let me know,
> Thank you!
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Is Spark Structured Streaming TOTALLY BROKEN (Spark Metadata Issues)

2020-06-18 Thread Jacek Laskowski
Hi Rachana,

> Should I go backward and use Spark Streaming DStream based.

No. Never. It's no longer supported (and should really be removed from the
codebase once and for all - dreaming...).

Spark focuses on Spark SQL and Spark Structured Streaming as user-facing
modules for batch and streaming queries, respectively.

Please note that I'm not a PMC member or even a committer so I'm speaking
for myself only (not representing the project in an official way).

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Thu, Jun 18, 2020 at 12:03 AM Rachana Srivastava
 wrote:

> *Structured Stream Vs Spark Steaming (DStream)?*
>
> Which is recommended for system stability.  Exactly once is NOT first
> priority.  First priority is STABLE system.
>
> I am I need to make a decision soon.  I need help.  Here is the question
> again.  Should I go backward and use Spark Streaming DStream based.  Write
> our own checkpoint and go from there.  At least we never encounter these
> metadata issues there.
>
> Thanks,
>
> Rachana
>
> On Wednesday, June 17, 2020, 02:02:20 PM PDT, Jungtaek Lim <
> kabhwan.opensou...@gmail.com> wrote:
>
>
> Just in case if anyone prefers ASF projects then there are other
> alternative projects in ASF as well, alphabetically, Apache Hudi [1] and
> Apache Iceberg [2]. Both are recently graduated as top level projects.
> (DISCLAIMER: I'm not involved in both.)
>
> BTW it would be nice if we make the metadata implementation on file stream
> source/sink be pluggable - from what I've seen, plugin approach has been
> selected as the way to go whenever some part is going to be complicated and
> it becomes arguable whether the part should be handled in Spark project vs
> should be outside. e.g. checkpoint manager, state store provider, etc. It
> would open up chances for the ecosystem to play with the challenge "without
> completely re-writing the file stream source and sink", focusing on
> scalability for metadata in a long run query. Alternative projects
> described above will still provide more higher-level features and
> look attractive, but sometimes it may be just "using a sledgehammer to
> crack a nut".
>
> 1. https://hudi.apache.org/
> 2. https://iceberg.apache.org/
>
>
> On Thu, Jun 18, 2020 at 2:34 AM Tathagata Das 
> wrote:
>
> Hello Rachana,
>
> Getting exactly-once semantics on files and making it scale to a very
> large number of files are very hard problems to solve. While Structured
> Streaming + built-in file sink solves the exactly-once guarantee that
> DStreams could not, it is definitely limited in other ways (scaling in
> terms of files, combining batch and streaming writes in the same place,
> etc). And solving this problem requires a holistic solution that is
> arguably beyond the scope of the Spark project.
>
> There are other projects that are trying to solve this file management
> issue. For example, Delta Lake <https://delta.io/>(full disclosure, I am
> involved in it) was built to exactly solve this problem - get exactly-once
> and ACID guarantees on files, but also scale to handling millions of files.
> Please consider it as part of your solution.
>
>
>
>
> On Wed, Jun 17, 2020 at 9:50 AM Rachana Srivastava
>  wrote:
>
> I have written a simple spark structured steaming app to move data from
> Kafka to S3. Found that in order to support exactly-once guarantee spark
> creates _spark_metadata folder, which ends up growing too large as the
> streaming app is SUPPOSE TO run FOREVER. But when the streaming app runs
> for a long time the metadata folder grows so big that we start getting OOM
> errors. Only way to resolve OOM is delete Checkpoint and Metadata folder
> and loose VALUABLE customer data.
>
> Spark open JIRAs SPARK-24295 and SPARK-29995, SPARK-30462, and SPARK-24295)
> Since Spark Streaming was NOT broken like this. Is Spark Streaming a
> BETTER choice?
>
>


Re: BOOK review of Spark: WARNING to spark users

2020-05-21 Thread Jacek Laskowski
Hi Emma,

I'm curious about the purpose of the email. Mind elaborating?

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Wed, May 20, 2020 at 10:43 PM emma davis 
wrote:

>
> *Book:* Machine Learning with Apache Spark Quick Start Guide
> *publisher* : packt>
>
>
> *F**ollow**ing* this Getting Started with Python in VS Code
> https://code.visualstudio.com/docs/python/python-tutorial
>
> I realised Jillur Qudus has written and published a book without any
> knowledge
> of subject matter, amongst other things Python.
>
>
>
> *Highlighted proof with further details further down the email. *
>
> import findspark # these lines of code are unnecessary see link above for
> setup
> findspark.init()
>
> Setting SPARK_HOME or any other spark variables are unnecessary because
> Spark like any
> frameworks is self contained and has its own conf directory for startup 
> persistent
> configuration settings.
> Obviously the software would find its own current directory upon starting
> i.e. sbin/start-master.sh
>
> Spark is a BIG DATA tool ( heavy distributed ,parallelism processing) so
> clearly you would expect its hello world demo programs to demonstrate
> that.
>
> what is the point of setting num_samples=100. something like 10**10 would
> make sense to test performance.
>
>
>
> *This is my warning do not end up wasting your valuable time as I did .  I
> fee your time is valuable. *
> *I realise the scam as I got a better understanding of the product by just
> doing the correct hello world program from correct source. *
>
> “Research by CISQ found that, in 2018, poor quality software cost
> organizations $2.8 trillion in the US alone. “
>
> I attribute this to the Indian IT industry claiming they can do job better
> than the natives [US , Europeans.] Implying Indian Education or IT people
> is superior. For example People like me born, live and educated  in the
> western Europe
>
> *https://www.it-cisq.org/the-cost-of-poor-quality-software-in-the-us-a-2018-report/The-Cost-of-Poor-Quality-Software-in-the-US-2018-Report.pdf
> <https://www.it-cisq.org/the-cost-of-poor-quality-software-in-the-us-a-2018-report/The-Cost-of-Poor-Quality-Software-in-the-US-2018-Report.pdf>*
>
>
> *Contributors: About the Author*
> “*Jillur Qudus* is a lead technical architect, polygot software engineer
> and data scientist
> with over 10 years of hand-on experience in architecting and engineering
> distributed,
> scalable , high performance .. to combat serious organised crime. Jillur
> has extensive experience working with government, intelligence,law
> enforcement and banking, and has worked across the world including
> Japan,Singapore,Malysia,Hong Kong and New Zealand .. founder of keisan, a
> UK-based company specializing in open source distributed technologies and
> machine learning…“
> This obviously means a lot to many but when I look at his work Judge for
> yourself based on evidence.
>
> *Page 54*
> * ”*
> Additional Python Packages
> > conda install -c conda-forge findspark
> > conda install -c conda-forge pykafka
> ...”**
>
> The remainder of the program was copied from spark website so that wasn’t
> wrong.
> *Page 63*
>
> * “*
> > cd *etc*/profile.d
> vi spark.sh
> $ export SPARK_HOME=/opt/spark-2.3.2-bin-hadoop2.7
> > source spark.sh
>
> .. in order for the SPARK_HOME environment variable to be successfully
> recognized and registered by findspark ...
> ….
>
> We are now ready to write out first spark application in Python ! …..
>
> # (1) import required Python dependencies
> import findspark
> findspark.init()
>
> (3)
> ….
> num_samples = 100 *“ ***
>
>
> emma davis
> emma.davi...@aol.com
>
>


Re: Spark Window Documentation

2020-05-08 Thread Jacek Laskowski
Hi Neeraj,

I'd start from "Contributing Documentation Changes" in
https://spark.apache.org/contributing.html

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Fri, May 8, 2020 at 10:37 PM neeraj bhadani 
wrote:

> Thanks Jacek for sharing the details. I could see some example here
> https://github.com/apache/spark/blob/master/python/pyspark/sql/window.py#L83 
> as
> mentioned in original email but not sure where this is reflecting on spark
> documentation. Also, what would be the process to contribute to the spark
> docs. I check the section "Contributing Documentation Changes" at this
> link : h <https://spark.apache.org/contributing.html>
> ttps://spark.apache.org/contributing.html
> <https://spark.apache.org/contributing.html> but couldn't find a way to
> contribute. I might be missing something here.
>
> If someone can help on how to contribute to the spark docs would be great.
>
> Regards,
> Neeraj
>
>
> On Fri, May 8, 2020 at 12:39 PM Jacek Laskowski  wrote:
>
>> Hi Neeraj,
>>
>> I'm not a committer so I might be wrong, but there is no "blessed way" to
>> include examples.
>>
>> There are some examples in the official documentation at
>> http://spark.apache.org/docs/latest/sql-programming-guide.html but this
>> is how to use the general concepts not specific operators.
>>
>> There are some examples at http://spark.apache.org/examples.html
>>
>> I think the best way would be to include examples as close to the methods
>> as possible and scaladoc/javadoc would be best IMHO.
>>
>> p.s. Just yesterday there was this thread "What open source projects have
>> the best docs?" on twitter @
>> https://twitter.com/adamwathan/status/1257641015835611138. You could
>> borrow some ideas of the docs that are claimed "the best".
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://about.me/JacekLaskowski
>> "The Internals Of" Online Books <https://books.japila.pl/>
>> Follow me on https://twitter.com/jaceklaskowski
>>
>> <https://twitter.com/jaceklaskowski>
>>
>>
>> On Fri, May 8, 2020 at 11:34 AM neeraj bhadani <
>> bhadani.neeraj...@gmail.com> wrote:
>>
>>> Hi Team,
>>> I was looking for a Spark window function example on documentation.
>>>
>>> For example, I could the function definition and params are explained
>>> nicely here:
>>> https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Window.rowsBetween
>>>
>>> and this is the source which is available since spark version 2.1:
>>> https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/window.html#Window.rowsBetween
>>>
>>> But I couldn't find an example which helps to understand How it works.
>>>
>>> Although, while browsing the GitHub code I have found some example here:
>>> https://github.com/apache/spark/blob/master/python/pyspark/sql/window.py#L83
>>>
>>> which I couldn't find on the spark official doc page. Where and how this
>>> example is linked with the official spark documentation.
>>>
>>> If such examples are not available, Could you please share the process
>>> on how I can contribute examples to the spark documentation.
>>>
>>> Regards,
>>> Neeraj
>>>
>>


Re: java.lang.OutOfMemoryError Spark Worker

2020-05-08 Thread Jacek Laskowski
Hi,

It's been a while since I worked with Spark Standalone, but I'd check the
logs of the workers. How do you spark-submit the app?

DId you check /grid/1/spark/work/driver-20200508153502-1291 directory?

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Fri, May 8, 2020 at 2:32 PM Hrishikesh Mishra 
wrote:

> Thanks Jacek for quick response.
> Due to our system constraints, we can't move to Structured Streaming now.
> But definitely YARN can be tried out.
>
> But my problem is I'm able to figure out where is the issue, Driver,
> Executor, or Worker. Even exceptions are clueless.  Please see the below
> exception, I'm unable to spot the issue for OOM.
>
> 20/05/08 15:36:55 INFO Worker: Asked to kill driver
> driver-20200508153502-1291
>
> 20/05/08 15:36:55 INFO DriverRunner: Killing driver process!
>
> 20/05/08 15:36:55 INFO CommandUtils: Redirection to
> /grid/1/spark/work/driver-20200508153502-1291/stderr closed: Stream closed
>
> 20/05/08 15:36:55 INFO CommandUtils: Redirection to
> /grid/1/spark/work/driver-20200508153502-1291/stdout closed: Stream closed
>
> 20/05/08 15:36:55 INFO ExternalShuffleBlockResolver: Application
> app-20200508153654-11776 removed, cleanupLocalDirs = true
>
> 20/05/08 15:36:55 INFO Worker: Driver driver-20200508153502-1291 was
> killed by user
>
> *20/05/08 15:43:06 WARN AbstractChannelHandlerContext: An exception
> 'java.lang.OutOfMemoryError: Java heap space' [enable DEBUG level for full
> stacktrace] was thrown by a user handler's exceptionCaught() method while
> handling the following exception:*
>
> *java.lang.OutOfMemoryError: Java heap space*
>
> *20/05/08 15:43:23 ERROR SparkUncaughtExceptionHandler: Uncaught exception
> in thread Thread[dispatcher-event-loop-6,5,main]*
>
> *java.lang.OutOfMemoryError: Java heap space*
>
> *20/05/08 15:43:17 WARN AbstractChannelHandlerContext: An exception
> 'java.lang.OutOfMemoryError: Java heap space' [enable DEBUG level for full
> stacktrace] was thrown by a user handler's exceptionCaught() method while
> handling the following exception:*
>
> *java.lang.OutOfMemoryError: Java heap space*
>
> 20/05/08 15:43:33 INFO ExecutorRunner: Killing process!
>
> 20/05/08 15:43:33 INFO ExecutorRunner: Killing process!
>
> 20/05/08 15:43:33 INFO ExecutorRunner: Killing process!
>
> 20/05/08 15:43:33 INFO ShutdownHookManager: Shutdown hook called
>
> 20/05/08 15:43:33 INFO ShutdownHookManager: Deleting directory
> /grid/1/spark/local/spark-e045e069-e126-4cff-9512-d36ad30ee922
>
>
>
>
> On Fri, May 8, 2020 at 5:14 PM Jacek Laskowski  wrote:
>
>> Hi,
>>
>> Sorry for being perhaps too harsh, but when you asked "Am I missing
>> something. " and I noticed this "Kafka Direct Stream" and "Spark Standalone
>> Cluster. " I immediately thought "Yeah...please upgrade your Spark env to
>> use Spark Structured Streaming at the very least and/or use YARN as the
>> cluster manager".
>>
>> Another thought was that the user code (your code) could be leaking
>> resources so Spark eventually reports heap-related errors that may not
>> necessarily be Spark's.
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://about.me/JacekLaskowski
>> "The Internals Of" Online Books <https://books.japila.pl/>
>> Follow me on https://twitter.com/jaceklaskowski
>>
>> <https://twitter.com/jaceklaskowski>
>>
>>
>> On Thu, May 7, 2020 at 1:12 PM Hrishikesh Mishra 
>> wrote:
>>
>>> Hi
>>>
>>> I am getting out of memory error in worker log in streaming jobs in
>>> every couple of hours. After this worker dies. There is no shuffle, no
>>> aggression, no. caching  in job, its just a transformation.
>>> I'm not able to identify where is the problem, driver or executor. And
>>> why worker getting dead after the OOM streaming job should die. Am I
>>> missing something.
>>>
>>> Driver Memory:  2g
>>> Executor memory: 4g
>>>
>>> Spark Version:  2.4
>>> Kafka Direct Stream
>>> Spark Standalone Cluster.
>>>
>>>
>>> 20/05/06 12:52:20 INFO SecurityManager: SecurityManager: authentication
>>> disabled; ui acls disabled; users  with view permissions: Set(root); groups
>>> with view permissions: Set(); users  with modify permissions: Set(root);
>>> groups with modify permissions: Set()
>>>
>>> 20/05/06 12:53:03 ERROR Spark

Re: java.lang.OutOfMemoryError Spark Worker

2020-05-08 Thread Jacek Laskowski
Hi,

Sorry for being perhaps too harsh, but when you asked "Am I missing
something. " and I noticed this "Kafka Direct Stream" and "Spark Standalone
Cluster. " I immediately thought "Yeah...please upgrade your Spark env to
use Spark Structured Streaming at the very least and/or use YARN as the
cluster manager".

Another thought was that the user code (your code) could be leaking
resources so Spark eventually reports heap-related errors that may not
necessarily be Spark's.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Thu, May 7, 2020 at 1:12 PM Hrishikesh Mishra 
wrote:

> Hi
>
> I am getting out of memory error in worker log in streaming jobs in every
> couple of hours. After this worker dies. There is no shuffle, no
> aggression, no. caching  in job, its just a transformation.
> I'm not able to identify where is the problem, driver or executor. And why
> worker getting dead after the OOM streaming job should die. Am I missing
> something.
>
> Driver Memory:  2g
> Executor memory: 4g
>
> Spark Version:  2.4
> Kafka Direct Stream
> Spark Standalone Cluster.
>
>
> 20/05/06 12:52:20 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users  with view permissions: Set(root); groups
> with view permissions: Set(); users  with modify permissions: Set(root);
> groups with modify permissions: Set()
>
> 20/05/06 12:53:03 ERROR SparkUncaughtExceptionHandler: Uncaught exception
> in thread Thread[ExecutorRunner for app-20200506124717-10226/0,5,main]
>
> java.lang.OutOfMemoryError: Java heap space
>
> at org.apache.xerces.util.XMLStringBuffer.append(Unknown Source)
>
> at org.apache.xerces.impl.XMLEntityScanner.scanData(Unknown Source)
>
> at org.apache.xerces.impl.XMLScanner.scanComment(Unknown Source)
>
> at
> org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.scanComment(Unknown
> Source)
>
> at
> org.apache.xerces.impl.XMLDocumentFragmentScannerImpl$FragmentContentDispatcher.dispatch(Unknown
> Source)
>
> at
> org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.scanDocument(Unknown
> Source)
>
> at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source)
>
> at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source)
>
> at org.apache.xerces.parsers.XMLParser.parse(Unknown Source)
>
> at org.apache.xerces.parsers.DOMParser.parse(Unknown Source)
>
> at org.apache.xerces.jaxp.DocumentBuilderImpl.parse(Unknown Source)
>
> at javax.xml.parsers.DocumentBuilder.parse(DocumentBuilder.java:150)
>
> at org.apache.hadoop.conf.Configuration.parse(Configuration.java:2480)
>
> at org.apache.hadoop.conf.Configuration.parse(Configuration.java:2468)
>
> at
> org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2539)
>
> at
> org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2492)
>
> at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2405)
>
> at org.apache.hadoop.conf.Configuration.set(Configuration.java:1143)
>
> at org.apache.hadoop.conf.Configuration.set(Configuration.java:1115)
>
> at
> org.apache.spark.deploy.SparkHadoopUtil$.org$apache$spark$deploy$SparkHadoopUtil$$appendS3AndSparkHadoopConfigurations(SparkHadoopUtil.scala:464)
>
> at
> org.apache.spark.deploy.SparkHadoopUtil$.newConfiguration(SparkHadoopUtil.scala:436)
>
> at
> org.apache.spark.deploy.SparkHadoopUtil.newConfiguration(SparkHadoopUtil.scala:114)
>
> at org.apache.spark.SecurityManager.(SecurityManager.scala:114)
>
> at org.apache.spark.deploy.worker.ExecutorRunner.org
> $apache$spark$deploy$worker$ExecutorRunner$$fetchAndRunExecutor(ExecutorRunner.scala:149)
>
> at
> org.apache.spark.deploy.worker.ExecutorRunner$$anon$1.run(ExecutorRunner.scala:73)
>
> 20/05/06 12:53:38 INFO DriverRunner: Worker shutting down, killing driver
> driver-20200505181719-1187
>
> 20/05/06 12:53:38 INFO DriverRunner: Killing driver process!
>
>
>
>
> Regards
> Hrishi
>


Re: Spark Window Documentation

2020-05-08 Thread Jacek Laskowski
Hi Neeraj,

I'm not a committer so I might be wrong, but there is no "blessed way" to
include examples.

There are some examples in the official documentation at
http://spark.apache.org/docs/latest/sql-programming-guide.html but this is
how to use the general concepts not specific operators.

There are some examples at http://spark.apache.org/examples.html

I think the best way would be to include examples as close to the methods
as possible and scaladoc/javadoc would be best IMHO.

p.s. Just yesterday there was this thread "What open source projects have
the best docs?" on twitter @
https://twitter.com/adamwathan/status/1257641015835611138. You could borrow
some ideas of the docs that are claimed "the best".

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Fri, May 8, 2020 at 11:34 AM neeraj bhadani 
wrote:

> Hi Team,
> I was looking for a Spark window function example on documentation.
>
> For example, I could the function definition and params are explained
> nicely here:
> https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Window.rowsBetween
>
> and this is the source which is available since spark version 2.1:
> https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/window.html#Window.rowsBetween
>
> But I couldn't find an example which helps to understand How it works.
>
> Although, while browsing the GitHub code I have found some example here:
> https://github.com/apache/spark/blob/master/python/pyspark/sql/window.py#L83
>
> which I couldn't find on the spark official doc page. Where and how this
> example is linked with the official spark documentation.
>
> If such examples are not available, Could you please share the process on
> how I can contribute examples to the spark documentation.
>
> Regards,
> Neeraj
>


Re: Release Apache Spark 2.4.4 before 3.0.0

2019-07-11 Thread Jacek Laskowski
Hi,

Thanks Dongjoon Hyun for stepping up as a release manager!
Much appreciated.

If there's a volunteer to cut a release, I'm always to support it.

In addition, the more frequent releases the better for end users so they
have a choice to upgrade and have all the latest fixes or wait. It's their
call not ours (when we'd keep them waiting).

My big 2 yes'es for the release!

Jacek


On Tue, 9 Jul 2019, 18:15 Dongjoon Hyun,  wrote:

> Hi, All.
>
> Spark 2.4.3 was released two months ago (8th May).
>
> As of today (9th July), there exist 45 fixes in `branch-2.4` including the
> following correctness or blocker issues.
>
> - SPARK-26038 Decimal toScalaBigInt/toJavaBigInteger not work for
> decimals not fitting in long
> - SPARK-26045 Error in the spark 2.4 release package with the
> spark-avro_2.11 dependency
> - SPARK-27798 from_avro can modify variables in other rows in local
> mode
> - SPARK-27907 HiveUDAF should return NULL in case of 0 rows
> - SPARK-28157 Make SHS clear KVStore LogInfo for the blacklist entries
> - SPARK-28308 CalendarInterval sub-second part should be padded before
> parsing
>
> It would be great if we can have Spark 2.4.4 before we are going to get
> busier for 3.0.0.
> If it's okay, I'd like to volunteer for an 2.4.4 release manager to roll
> it next Monday. (15th July).
> How do you think about this?
>
> Bests,
> Dongjoon.
>


Re: Change parallelism number in Spark Streaming

2019-06-27 Thread Jacek Laskowski
Hi,

I've got a talk "The internals of stateful stream processing in Spark
Structured Streaming" at https://dataxday.fr/ today and am going to include
the tool on the slides to thank you for the work. Thanks.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
The Internals of Spark SQL https://bit.ly/spark-sql-internals
The Internals of Spark Structured Streaming
https://bit.ly/spark-structured-streaming
The Internals of Apache Kafka https://bit.ly/apache-kafka-internals
Follow me at https://twitter.com/jaceklaskowski



On Thu, Jun 27, 2019 at 3:32 AM Jungtaek Lim  wrote:

> Glad to help, Jacek.
>
> I'm happy you're doing similar thing, which means it could be pretty
> useful for others as well. Looks like it might be good enough to contribute
> state source and sink. I'll sort out my code and submit a PR.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
>
> On Thu, Jun 27, 2019 at 7:54 AM Jacek Laskowski  wrote:
>
>> Hi Jungtaek,
>>
>> That's very helpful to have the state source. As a matter of fact I've
>> just this week been working on a similar tool (!) and have been wondering
>> how to recreate the schema of the state key and value. You've helped me a
>> lot. Thanks.
>>
>> Jacek
>>
>> On Wed, 26 Jun 2019, 23:58 Jungtaek Lim,  wrote:
>>
>>> Hi,
>>>
>>> you could consider state operator's partition numbers as "max
>>> parallelism", as parallelism can be reduced via applying coalesce. It would
>>> be effectively working similar as key groups.
>>>
>>> If you're also considering offline query, there's a tool to manipulate
>>> state which enables reading and writing state in structured streaming,
>>> achieving rescaling and schema evolution.
>>>
>>> https://github.com/HeartSaVioR/spark-state-tools
>>> (DISCLAIMER: I'm an author of this tool.)
>>>
>>> Thanks,
>>> Jungtaek Lim (HeartSaVioR)
>>>
>>> On Thu, Jun 27, 2019 at 4:48 AM Rong, Jialei 
>>> wrote:
>>>
>>>> Thank you for your quick reply!
>>>>
>>>> Is there any plan to improve this?
>>>>
>>>> I asked this question due to some investigation on comparing those
>>>> state of art streaming systems, among which Flink and DataFlow allow
>>>> changing parallelism number, and by my knowledge of Spark Streaming, it
>>>> seems it is also able to do that: if some “key interval” concept is used,
>>>> then state can somehow decoupled from partition number by consistent
>>>> hashing.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Regards
>>>>
>>>> Jialei
>>>>
>>>>
>>>>
>>>> *From: *Jacek Laskowski 
>>>> *Date: *Wednesday, June 26, 2019 at 11:00 AM
>>>> *To: *"Rong, Jialei" 
>>>> *Cc: *"user @spark" 
>>>> *Subject: *Re: Change parallelism number in Spark Streaming
>>>>
>>>>
>>>>
>>>> Hi,
>>>>
>>>>
>>>>
>>>> It's not allowed to change the numer of partitions after your streaming
>>>> query is started.
>>>>
>>>>
>>>>
>>>> The reason is exactly the number of state stores which is exactly the
>>>> number of partitions (perhaps multiplied by the number of stateful
>>>> operators).
>>>>
>>>>
>>>>
>>>> I think you'll even get a warning or an exception when you change it
>>>> after restarting the query.
>>>>
>>>>
>>>>
>>>> The number of partitions is stored in a checkpoint location.
>>>>
>>>>
>>>>
>>>> Jacek
>>>>
>>>>
>>>>
>>>> On Wed, 26 Jun 2019, 19:30 Rong, Jialei, 
>>>> wrote:
>>>>
>>>> Hi Dear Spark Expert
>>>>
>>>>
>>>>
>>>> I’m curious about a question regarding Spark Streaming/Structured
>>>> Streaming: whether it allows to change parallelism number(the default one
>>>> or the one specified in particular operator) in a stream having stateful
>>>> transform/operator? Whether this will cause my checkpointed state get
>>>> messed up?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Regards
>>>>
>>>> Jialei
>>>>
>>>>
>>>>
>>>>
>>>
>>> --
>>> Name : Jungtaek Lim
>>> Blog : http://medium.com/@heartsavior
>>> Twitter : http://twitter.com/heartsavior
>>> LinkedIn : http://www.linkedin.com/in/heartsavior
>>>
>>
>
> --
> Name : Jungtaek Lim
> Blog : http://medium.com/@heartsavior
> Twitter : http://twitter.com/heartsavior
> LinkedIn : http://www.linkedin.com/in/heartsavior
>


Re: Change parallelism number in Spark Streaming

2019-06-26 Thread Jacek Laskowski
Hi Jungtaek,

That's very helpful to have the state source. As a matter of fact I've just
this week been working on a similar tool (!) and have been wondering how to
recreate the schema of the state key and value. You've helped me a lot.
Thanks.

Jacek

On Wed, 26 Jun 2019, 23:58 Jungtaek Lim,  wrote:

> Hi,
>
> you could consider state operator's partition numbers as "max
> parallelism", as parallelism can be reduced via applying coalesce. It would
> be effectively working similar as key groups.
>
> If you're also considering offline query, there's a tool to manipulate
> state which enables reading and writing state in structured streaming,
> achieving rescaling and schema evolution.
>
> https://github.com/HeartSaVioR/spark-state-tools
> (DISCLAIMER: I'm an author of this tool.)
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> On Thu, Jun 27, 2019 at 4:48 AM Rong, Jialei 
> wrote:
>
>> Thank you for your quick reply!
>>
>> Is there any plan to improve this?
>>
>> I asked this question due to some investigation on comparing those state
>> of art streaming systems, among which Flink and DataFlow allow changing
>> parallelism number, and by my knowledge of Spark Streaming, it seems it is
>> also able to do that: if some “key interval” concept is used, then state
>> can somehow decoupled from partition number by consistent hashing.
>>
>>
>>
>>
>>
>> Regards
>>
>> Jialei
>>
>>
>>
>> *From: *Jacek Laskowski 
>> *Date: *Wednesday, June 26, 2019 at 11:00 AM
>> *To: *"Rong, Jialei" 
>> *Cc: *"user @spark" 
>> *Subject: *Re: Change parallelism number in Spark Streaming
>>
>>
>>
>> Hi,
>>
>>
>>
>> It's not allowed to change the numer of partitions after your streaming
>> query is started.
>>
>>
>>
>> The reason is exactly the number of state stores which is exactly the
>> number of partitions (perhaps multiplied by the number of stateful
>> operators).
>>
>>
>>
>> I think you'll even get a warning or an exception when you change it
>> after restarting the query.
>>
>>
>>
>> The number of partitions is stored in a checkpoint location.
>>
>>
>>
>> Jacek
>>
>>
>>
>> On Wed, 26 Jun 2019, 19:30 Rong, Jialei, 
>> wrote:
>>
>> Hi Dear Spark Expert
>>
>>
>>
>> I’m curious about a question regarding Spark Streaming/Structured
>> Streaming: whether it allows to change parallelism number(the default one
>> or the one specified in particular operator) in a stream having stateful
>> transform/operator? Whether this will cause my checkpointed state get
>> messed up?
>>
>>
>>
>>
>>
>> Regards
>>
>> Jialei
>>
>>
>>
>>
>
> --
> Name : Jungtaek Lim
> Blog : http://medium.com/@heartsavior
> Twitter : http://twitter.com/heartsavior
> LinkedIn : http://www.linkedin.com/in/heartsavior
>


Re: Change parallelism number in Spark Streaming

2019-06-26 Thread Jacek Laskowski
Hi,

No idea. I've just begun exploring the current state of state management in
spark structured streaming. I'd not be surprised if what you're after were
not possible. Stateful stream processing in SSS is fairly young.

Jacek

On Wed, 26 Jun 2019, 21:48 Rong, Jialei,  wrote:

> Thank you for your quick reply!
>
> Is there any plan to improve this?
>
> I asked this question due to some investigation on comparing those state
> of art streaming systems, among which Flink and DataFlow allow changing
> parallelism number, and by my knowledge of Spark Streaming, it seems it is
> also able to do that: if some “key interval” concept is used, then state
> can somehow decoupled from partition number by consistent hashing.
>
>
>
>
>
> Regards
>
> Jialei
>
>
>
> *From: *Jacek Laskowski 
> *Date: *Wednesday, June 26, 2019 at 11:00 AM
> *To: *"Rong, Jialei" 
> *Cc: *"user @spark" 
> *Subject: *Re: Change parallelism number in Spark Streaming
>
>
>
> Hi,
>
>
>
> It's not allowed to change the numer of partitions after your streaming
> query is started.
>
>
>
> The reason is exactly the number of state stores which is exactly the
> number of partitions (perhaps multiplied by the number of stateful
> operators).
>
>
>
> I think you'll even get a warning or an exception when you change it after
> restarting the query.
>
>
>
> The number of partitions is stored in a checkpoint location.
>
>
>
> Jacek
>
>
>
> On Wed, 26 Jun 2019, 19:30 Rong, Jialei, 
> wrote:
>
> Hi Dear Spark Expert
>
>
>
> I’m curious about a question regarding Spark Streaming/Structured
> Streaming: whether it allows to change parallelism number(the default one
> or the one specified in particular operator) in a stream having stateful
> transform/operator? Whether this will cause my checkpointed state get
> messed up?
>
>
>
>
>
> Regards
>
> Jialei
>
>
>
>


Re: Change parallelism number in Spark Streaming

2019-06-26 Thread Jacek Laskowski
Hi,

It's not allowed to change the numer of partitions after your streaming
query is started.

The reason is exactly the number of state stores which is exactly the
number of partitions (perhaps multiplied by the number of stateful
operators).

I think you'll even get a warning or an exception when you change it after
restarting the query.

The number of partitions is stored in a checkpoint location.

Jacek

On Wed, 26 Jun 2019, 19:30 Rong, Jialei,  wrote:

> Hi Dear Spark Expert
>
>
>
> I’m curious about a question regarding Spark Streaming/Structured
> Streaming: whether it allows to change parallelism number(the default one
> or the one specified in particular operator) in a stream having stateful
> transform/operator? Whether this will cause my checkpointed state get
> messed up?
>
>
>
>
>
> Regards
>
> Jialei
>
>
>


Re: Spark 2.2 With Column usage

2019-06-11 Thread Jacek Laskowski
Hi,

Why are you doing the following two lines?

.select("id",lit(referenceFiltered))
.selectexpr(
"id"
)

What are you trying to achieve? What's lit and what's referenceFiltered?
What's the difference between select and selectexpr? Please start at
http://spark.apache.org/docs/latest/sql-programming-guide.html and then hop
onto
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.package
to
know the Spark API better. I'm sure you'll quickly find out the answer(s).

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
The Internals of Spark SQL https://bit.ly/spark-sql-internals
The Internals of Spark Structured Streaming
https://bit.ly/spark-structured-streaming
The Internals of Apache Kafka https://bit.ly/apache-kafka-internals
Follow me at https://twitter.com/jaceklaskowski



On Sat, Jun 8, 2019 at 12:53 PM anbutech  wrote:

> Thanks Jacek Laskowski Sir.but i didn't get the point here
>
> please advise the below one are you expecting:
>
> dataset1.as("t1)
>
> join(dataset3.as("t2"),
>
> col(t1.col1) === col(t2.col1), JOINTYPE.Inner )
>
> .join(dataset4.as("t3"), col(t3.col1) === col(t1.col1),
>
> JOINTYPE.Inner)
> .select("id",lit(referenceFiltered))
> .selectexpr(
> "id"
> )
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark logging questions

2019-06-08 Thread Jacek Laskowski
Hi,

What are "the spark driver and executor threads information" and "spark
application logging"?

Spark uses log4j so set up logging levels appropriately and you should be
done.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
The Internals of Spark SQL https://bit.ly/spark-sql-internals
The Internals of Spark Structured Streaming
https://bit.ly/spark-structured-streaming
The Internals of Apache Kafka https://bit.ly/apache-kafka-internals
Follow me at https://twitter.com/jaceklaskowski



On Fri, Jun 7, 2019 at 1:13 PM test test  wrote:

> Hello,
>
> How can we dump the spark driver and executor threads information in spark
> application logging.?
>
>
> PS: submitting spark job using spark submit
>
> Regards
> Rohit
>


Re: Spark 2.2 With Column usage

2019-06-08 Thread Jacek Laskowski
Hi,

> val referenceFiltered = dataset2.filter(.dataDate ==
date).filter.someColumn).select("id").toString
> .withColumn("new_column",lit(referenceFiltered))

That won't work since lit is a function (adapter) to convert Scala values
to Catalyst expressions.

Unless I'm mistaken, in your case, what you really need is to replace
`withColumn` with `select("id")` itself and you're done.

When I'm writing this (I'm saying exactly what you actually have already)
and I'm feeling confused.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
The Internals of Spark SQL https://bit.ly/spark-sql-internals
The Internals of Spark Structured Streaming
https://bit.ly/spark-structured-streaming
The Internals of Apache Kafka https://bit.ly/apache-kafka-internals
Follow me at https://twitter.com/jaceklaskowski



On Sat, Jun 8, 2019 at 6:05 AM anbutech  wrote:

> Hi Sir,
>
> Could you please advise to fix the below issue in the withColumn in the
> spark 2.2 scala 2.11 joins
>
> def processing(spark:SparkSession,
>
> dataset1:Dataset[Reference],
>
> dataset2:Dataset[DataCore],
>
> dataset3:Dataset[ThirdPartyData] ,
>
> dataset4:Dataset[OtherData]
>
> date:String):Dataset[DataMerge] {
>
> val referenceFiltered = dataset2.filter(.dataDate ==
> date).filter.someColumn).select("id").toString
>
> dataset1.as("t1)
>
> join(dataset3.as("t2"),
>
> col(t1.col1) === col(t2.col1), JOINTYPE.Inner )
>
> .join(dataset4.as("t3"), col(t3.col1) === col(t1.col1),
>
> JOINTYPE.Inner)
>
> .withColumn("new_column",lit(referenceFiltered))
>
> .selectexpr(
>
> "id", ---> want to get this value
>
> "column1,
>
> "column2,
>
> "column3",
>
> "column4" )
>
> }
>
> how do i get the String value ,let say the value"124567"
> ("referenceFiltered") inside the withColumn?
>
> im getting the withColumn output as "id:BigInt" . I want to get the same
> value for all the records.
>
> Note:
>
> I have asked not use cross join in the code. Any other way to fix this
> issue.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


[SQL] Why casting string column to timestamp gives null?

2019-06-07 Thread Jacek Laskowski
Hi,

Why is casting a string column to timestamp not giving the same results as
going through casting to long in-between? I'm tempted to consider it a bug.

scala> spark.version
res4: String = 2.4.3

scala> Seq("1", "2").toDF("ts").select($"ts" cast "timestamp").show
++
|  ts|
++
|null|
|null|
++

scala> Seq("1", "2").toDF("ts").select($"ts" cast "long").select($"ts" cast
"timestamp").show
+---+
| ts|
+---+
|1970-01-01 01:00:01|
|1970-01-01 01:00:02|
+---+

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
The Internals of Spark SQL https://bit.ly/spark-sql-internals
The Internals of Spark Structured Streaming
https://bit.ly/spark-structured-streaming
The Internals of Apache Kafka https://bit.ly/apache-kafka-internals
Follow me at https://twitter.com/jaceklaskowski


Re: What is the difference for the following UDFs?

2019-05-14 Thread Jacek Laskowski
Hi,

For this particular case I'd use Column.substr (
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Column),
e.g.

val ns = Seq(("hello world", 1, 5)).toDF("w", "b", "e")
scala> ns.select($"w".substr($"b", $"e" - $"b" + 1) as "demo").show
+-+
| demo|
+-+
|hello|
+-+

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski


On Tue, May 14, 2019 at 5:08 PM Qian He  wrote:

> For example, I have a dataframe with 3 columns: URL, START, END. For each
> url from URL column, I want to fetch a substring of it starting from START
> and ending at END.
> ++--+-+
> |URL|START |END |
> ++--+-+
> |www.amazon.com  |4  |14 |
> |www.yahoo.com |4  |13 |
> |www.amazon.com  |4  |14 |
> |www.google.com|4  |14 |
>
> I have UDF1:
>
> def getSubString = (input: String, start: Int, end: Int) => {
>input.substring(start, end)
> }
> val udf1 = udf(getSubString)
>
> and another UDF2:
>
> def getColSubString()(c1: Column, c2: Column, c3: Column): Column = {
>c1.substr(c2, c3-c2)
> }
>
> Let's assume they can both generate the result I want. But, from performance 
> perspective, is there any difference between those two UDFs?
>
>
>


Re: Spark SQL met "Block broadcast_xxx not found"

2019-05-07 Thread Jacek Laskowski
Hi,

I'm curious about "I found the bug code". Can you point me at it? Thanks.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski


On Tue, May 7, 2019 at 9:34 AM Xilang Yan  wrote:

> Ok... I am sure it is a bug of spark, I found the bug code, but the code is
> removed in 2.2.3, so I just upgrade spark to fix the problem.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: How to use same SparkSession in another app?

2019-04-16 Thread Jacek Laskowski
Hi,

Not possible. What are you really trying to do? Why do you need to share
dataframes? They're nothing but metadata of a distributed computation (no
data inside) so what would be the purpose of such sharing?

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski


On Tue, Apr 16, 2019 at 1:57 PM Rishikesh Gawade 
wrote:

> Hi.
> I wish to use a SparkSession created by one app in another app so that i
> can use the dataframes belonging to that session. Is it possible to use the
> same sparkSession in another app?
> Thanks,
> Rishikesh
>


Re: Observing DAGScheduler Log Messages

2019-04-07 Thread Jacek Laskowski
Hi,

Add the following line to conf/log4j.properties and you should have all the
logs:

log4j.logger.org.apache.spark.scheduler.DAGScheduler=ALL

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski


On Sun, Apr 7, 2019 at 6:05 PM M Bilal  wrote:

> Hi,
>
> I want to observe the log messages from DAGScheduler in Apache Spark.
> Which log files do I need to check.
> I have tried observing the driver logs and worker stderr logs but I can't
> find any messages that are from that class.
>
>  I am using Spark 3.0.0 snapshot in standalone mode.
>
> Thanks.
>
> Regards,
> Bilal
>


Re: Spark 2.4 partitions and tasks

2019-02-12 Thread Jacek Laskowski
Hi,

Can you show the plans with explain(extended=true) for both versions?
That's where I'd start to pinpoint the issue. Perhaps the underlying
execution engine change to affect keyBy? Dunno and guessing...

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski


On Fri, Feb 8, 2019 at 5:09 PM Pedro Tuero  wrote:

> I did a repartition to 1 (hardcoded) before the keyBy and it ends in
> 1.2 minutes.
> The questions remain open, because I don't want to harcode paralellism.
>
> El vie., 8 de feb. de 2019 a la(s) 12:50, Pedro Tuero (
> tuerope...@gmail.com) escribió:
>
>> 128 is the default parallelism defined for the cluster.
>> The question now is why keyBy operation is using default parallelism
>> instead of the number of partition of the RDD created by the previous step
>> (5580).
>> Any clues?
>>
>> El jue., 7 de feb. de 2019 a la(s) 15:30, Pedro Tuero (
>> tuerope...@gmail.com) escribió:
>>
>>> Hi,
>>> I am running a job in spark (using aws emr) and some stages are taking a
>>> lot more using spark  2.4 instead of Spark 2.3.1:
>>>
>>> Spark 2.4:
>>> [image: image.png]
>>>
>>> Spark 2.3.1:
>>> [image: image.png]
>>>
>>> With Spark 2.4, the keyBy operation take more than 10X what it took with
>>> Spark 2.3.1
>>> It seems to be related to the number of tasks / partitions.
>>>
>>> Questions:
>>> - Is it not supposed that the number of task of a job is related to
>>> number of parts of the RDD left by the previous job? Did that change in
>>> version 2.4??
>>> - Which tools/ configuration may I try, to reduce this aberrant
>>> downgrade of performance??
>>>
>>> Thanks.
>>> Pedro.
>>>
>>


Re: structured streaming handling validation and json flattening

2019-02-11 Thread Jacek Laskowski
Hi Lian,

"What have you tried?" would be a good starting point. Any help on this?

How do you read the JSONs? readStream.json? You could use readStream.text
followed by filter to include/exclude good/bad JSONs.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski


On Sat, Feb 9, 2019 at 8:25 PM Lian Jiang  wrote:

> Hi,
>
> We have a structured streaming job that converting json into parquets. We
> want to validate the json records. If a json record is not valid, we want
> to log a message and refuse to write it into the parquet. Also the json has
> nesting jsons and we want to flatten the nesting jsons into other parquets
> by using the same streaming job. My questions are:
>
> 1. how to validate the json records in a structured streaming job?
> 2. how to flattening the nesting jsons in a structured streaming job?
> 3. is it possible to use one structured streaming job to validate json,
> convert json into a parquet and convert nesting jsons into other parquets?
>
> I think unstructured streaming can achieve these goals but structured
> streaming is recommended by spark community.
>
> Appreciate your feedback!
>


Re: Where is the DAG stored before catalyst gets it?

2018-10-06 Thread Jacek Laskowski
Hi Jean Georges,

> I am assuming it is still in the master and when catalyst is finished it
sends the tasks to the workers.

Sorry to be that direct, but the sentence does not make much sense to me.
Again, very sorry for saying it in the very first sentence. Since I know
Jean Georges I allowed myself for more openness.

In other words, "the master" part seems to suggest that you use Spark
Standalone cluster. Correct? Other cluster use different naming for the
master/manager node.

"when catalyst is finished" that one is really tough to understand. You
mean once all the optimizations are applied and the query is ready for
execution? The final output of the "query execution pipeline" is to
generate a RDD with the right code for execution. At this phase, the query
is more an RDD than a Dataset.

"it sends the tasks to the workers." since we're talking about an RDD, this
abstraction is planned as a set of tasks (one per partition of the RDD).
And yes, the tasks are sent out over the wire to executors. It's been like
this from Spark 1.0 (and even earlier).

Hope I helped a bit.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski


On Fri, Oct 5, 2018 at 12:36 AM Jean Georges Perrin  wrote:

> Hi,
>
> I am assuming it is still in the master and when catalyst is finished it
> sends the tasks to the workers.
>
> Correct?
>
> tia
>
> jg
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark code to write to MySQL and Hive

2018-08-29 Thread Jacek Laskowski
Hi,

I haven't checked my answer (too lazy today), but think I know what might
be going on.

tl;dr Use cache to preserve the initial set of rows from mysql

After you append new rows, you will have twice as many rows as you had
previously. Correct?

Since newDF references the table every time you use it in a structured
query, say to write it to a table, the source table will get re-loaded and
hence the number of rows changes.

What you should do is to execute newDF.cache.count right after val newDF =
mysqlDF.select... so the data (rows) remains on executors and won't get
reloaded.

Hope that helps.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski


On Wed, Aug 29, 2018 at 4:59 PM  wrote:

> Sorry, last mail format was not good.
>
>
>
> *println*(*"Going to talk to mySql"*)
>
>
> *// Read table from mySQL.**val *mysqlDF = spark.read.jdbc(jdbcUrl,
> table, properties)
> *println*(*"I am back from mySql"*)
>
> mysqlDF.show()
>
>
> *// Create a new Dataframe with column 'id' increased to avoid Duplicate
> primary keys**val *newDF = mysqlDF.select((*col*(*"id"*) + 10).as(*"id"*),
> *col*(*"country"*), *col*(*"city"*))
> newDF.printSchema()
> newDF.show()
>
>
> *// Insert records into the table.*newDF.write
>   .mode(SaveMode.*Append*)
>   .jdbc(jdbcUrl, table, properties)
>
>
> *// Write to Hive - This Creates a new table.*newDF.write.saveAsTable(
> *"cities"*)
> newDF.show()
>
>
>
>
>
>
>
> Going to talk to mySql
>
> I am back from mySql
>
> +---+--+-+
>
> | id|   country| city|
>
> +---+--+-+
>
> |  1|   USA|Palo Alto|
>
> |  2|Czech Republic| Brno|
>
> |  3|   USA|Sunnyvale|
>
> |  4|  null| null|
>
> +---+--+-+
>
>
>
> root
>
> |-- id: long (nullable = false)
>
> |-- country: string (nullable = true)
>
> |-- city: string (nullable = true)
>
>
>
> +---+--+-+
>
> | id|   country| city|
>
> +---+--+-+
>
> | 11|   USA|Palo Alto|
>
> | 12|Czech Republic| Brno|
>
> | 13|   USA|Sunnyvale|
>
> | 14|  null| null|
>
> +---+--+-+
>
>
>
> +---+--+-+
>
> | id|   country| city|
>
> +---+--+-+
>
> | 11|   USA|Palo Alto|
>
> | 12|Czech Republic| Brno|
>
> | 13|   USA|Sunnyvale|
>
> | 14|  null| null|
>
> | 24|  null| null|
>
> | 23|   USA|Sunnyvale|
>
> | 22|Czech Republic| Brno|
>
> | 21|   USA|Palo Alto|
>
> +---+--+-+
>
>
>
> Thanks,
>
> Ravi
>
>
>
> *From:* ryanda...@gmail.com 
> *Sent:* Wednesday, August 29, 2018 8:19 PM
> *To:* user@spark.apache.org
> *Subject:* Spark code to write to MySQL and Hive
>
>
>
> Hi,
>
>
>
> Can anyone help me to understand what is happening with my code ?
>
>
>
> I wrote a Spark application to read from a MySQL table [that already has 4
> records], Create a new DF by adding 10 to the ID field.  Then, I wanted to
> write the new DF to MySQL as well as to Hive.
>
>
>
> I am surprised to see additional set of records in Hive !! I am not able
> to understand how the *newDF *has records with IDs 21 to 24.  I know that
> a DF is immutable. If so, how come it has 4 records at one point and 8
> records at later point ?
>
>
>
>
> *// Read table from mySQL.**val *mysqlDF = spark.read.jdbc(jdbcUrl,
> table, properties)
> *println*(*"I am back from mySql"*)
>
>
>
>
>
>
>
>
>
> mysqlDF.show()
>
>
>
>
>
>
>
>
>
>
>
>
> *// Create a new Dataframe with column 'id' increased to avoid Duplicate
> primary keys**val *newDF = mysqlDF.select((*col*(*"id"*) + 10).as(*"id"*),
> *col*(*"country"*), *col*(*"city"*))
> newDF.printSchema()
> newDF.show()
>
>
>
>
>
>
> *// Insert records into the MySQL table.*newDF.write
>   .mode(SaveMode.*Append*)
>   .jdbc(jdbcUrl, table, properties)
>
>
>
>
> *// Write to Hive - This Creates a new table.*newDF.write.saveAsTable(
> *"cities"*)
> newDF.show()
>
>
>
>
>

Re: Bug in Window Function

2018-07-25 Thread Jacek Laskowski
Hi Elior,

Could you show the query that led to the exception?

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski

On Wed, Jul 25, 2018 at 10:04 AM, Elior Malul  wrote:

> Exception in thread "main" org.apache.spark.sql.AnalysisException:
> collect_set(named_struct(value, country#123 AS value#346, count,
> (cast(count(country#123) windowspecdefinit ion(campaign_id#104,
> app_id#93, country#123, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
> FOLLOWING) as double) / cast(count(1) windowspecdefinition(campaign_id#104,
> app_id #93, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
> as double)) AS count#349) AS histogram_country#350, 0, 0)
> windowspecdefinition(campaign_id#104, app_id#93, ROWS  BETWEEN
> UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS 
> collect_set(named_struct(NamePlaceholder(),
> country AS `value`, NamePlaceholder(), (CAST(count(country) OVER (PARTITI
>   ON BY campaign_id, app_id, country UnspecifiedFrame) AS DOUBLE) /
> CAST(count(1) OVER (PARTITION BY campaign_id, app_id UnspecifiedFrame) AS
> DOUBLE)) AS `count`) AS `histogram _country`) OVER (PARTITION BY
> campaign_id, app_id UnspecifiedFrame)#352 has multiple Window
> Specifications (ArrayBuffer(windowspecdefinition(campaign_id#104,
> app_id#93, ROWS  BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING),
> windowspecdefinition(campaign_id#104, app_id#93, country#123, ROWS
> BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)) ).Please file a
> bug report with this error message, stack trace, and the query.;
>


Re: Spark 2.4 release date

2018-06-18 Thread Jacek Laskowski
Hi,

What about https://issues.apache.org/jira/projects/SPARK/versions/12342385?

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski

On Mon, Jun 18, 2018 at 9:41 PM, Li Gao  wrote:

> Hello,
>
> Do we know the estimate when Spark 2.4 will be GA?
> We are evaluating whether to back port some of 2.4 fixes into our 2.3
> deployment.
>
> Thank you.
>


Re: Spark Structured Streaming is giving error “org.apache.spark.sql.AnalysisException: Inner join between two streaming DataFrames/Datasets is not supported;”

2018-05-28 Thread Jacek Laskowski
Hi,

After you leave Spark Structured Streaming right after you generate RDDs
(for your streaming queries) you can do any kind of "joins". You're again
in the old good days of RDD programming (with all the whistles and bells).

Please note that Spark Structured Streaming != Spark Streaming since the
former uses Dataset API while the latter RDD API.

Don't touch RDD API and Spark Streaming unless you know what you're doing :)

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski

On Tue, May 15, 2018 at 5:36 PM, ☼ R Nair (रविशंकर नायर) <
ravishankar.n...@gmail.com> wrote:

> Hi Jacek,
>
> If we use RDD instead of Dataframe, can we accomplish the same? I mean, is
> joining  between RDDS allowed in Spark streaming ?
>
> Best,
> Ravi
>
> On Sun, May 13, 2018 at 11:18 AM Jacek Laskowski <ja...@japila.pl> wrote:
>
>> Hi,
>>
>> The exception message should be self-explanatory and says that you cannot
>> join two streaming Datasets. This feature was added in 2.3 if I'm not
>> mistaken.
>>
>> Just to be sure that you work with two streaming Datasets, can you show
>> the query plan of the join query?
>>
>> Jacek
>>
>> On Sat, 12 May 2018, 16:57 ThomasThomas, <thomaspt...@gmail.com> wrote:
>>
>>> Hi There,
>>>
>>> Our use case is like this.
>>>
>>> We have a nested(multiple) JSON message flowing through Kafka Queue.
>>> Read
>>> the message from Kafka using Spark Structured Streaming(SSS) and  explode
>>> the data and flatten all data into single record using DataFrame joins
>>> and
>>> land into a relational database table(DB2).
>>>
>>> But we are getting the following error when we write into db using JDBC.
>>>
>>> “org.apache.spark.sql.AnalysisException: Inner join between two
>>> streaming
>>> DataFrames/Datasets is not supported;”
>>>
>>> Any help would be greatly appreciated.
>>>
>>> Thanks,
>>> Thomas Thomas
>>> Mastermind Solutions LLC.
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>


Re: help with streaming batch interval question needed

2018-05-25 Thread Jacek Laskowski
Hi Peter,

> Basically I need to find a way to set the batch-interval in (b), similar
as in (a) below.

That's trigger method on DataStreamWriter.

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.DataStreamWriter

import org.apache.spark.sql.streaming.Trigger
df.writeStream.trigger(Trigger.ProcessingTime("1 second"))

See
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski

On Thu, May 24, 2018 at 10:14 PM, Peter Liu <peter.p...@gmail.com> wrote:

> Hi there,
>
> from my apache spark streaming website (see links below),
>
>- the batch-interval is set when a spark StreamingContext is
>constructed (see example (a) quoted below)
>- the StreamingContext is available in older and new Spark version
>(v1.6, v2.2 to v2.3.0) (see https://spark.apache.org/docs/
>1.6.0/streaming-programming-guide.html
><https://spark.apache.org/docs/1.6.0/streaming-programming-guide.html>
>and https://spark.apache.org/docs/2.3.0/streaming-programming-
>guide.html )
>- however, example (b) below  doesn't use StreamingContext, but
>StreamingSession object to setup a streaming flow;
>
> What does the usage difference in (a) and (b) mean? I was wondering if
> this would mean a different streaming approach ("traditional" streaming vs
> structured streaming?
>
> Basically I need to find a way to set the batch-interval in (b), similar
> as in (a) below.
>
> Would be great if someone can please share some insights here.
>
> Thanks!
>
> Peter
>
> (a)
> https://spark.apache.org/docs/2.3.0/streaming-programming-guide.html )
>
> import org.apache.spark._import org.apache.spark.streaming._
> val conf = new SparkConf().setAppName(appName).setMaster(master)val *ssc *= 
> new StreamingContext(conf, Seconds(1))
>
>
> (b)
> ( from databricks' https://databricks.com/blog/
> 2017/04/26/processing-data-in-apache-kafka-with-structured-
> streaming-in-apache-spark-2-2.html)
>
>val *spark *= SparkSession.builder()
> .appName(appName)
>   .getOrCreate()
> ...
>
> jsonOptions = { "timestampFormat": nestTimestampFormat }
> parsed = *spark *\
>   .readStream \
>   .format("kafka") \
>   .option("kafka.bootstrap.servers", "localhost:9092") \
>   .option("subscribe", "nest-logs") \
>   .load() \
>   .select(from_json(col("value").cast("string"), schema, 
> jsonOptions).alias("parsed_value"))
>
>
>
>
>


Re: Spark Structured Streaming is giving error “org.apache.spark.sql.AnalysisException: Inner join between two streaming DataFrames/Datasets is not supported;”

2018-05-13 Thread Jacek Laskowski
Hi,

The exception message should be self-explanatory and says that you cannot
join two streaming Datasets. This feature was added in 2.3 if I'm not
mistaken.

Just to be sure that you work with two streaming Datasets, can you show the
query plan of the join query?

Jacek

On Sat, 12 May 2018, 16:57 ThomasThomas,  wrote:

> Hi There,
>
> Our use case is like this.
>
> We have a nested(multiple) JSON message flowing through Kafka Queue.  Read
> the message from Kafka using Spark Structured Streaming(SSS) and  explode
> the data and flatten all data into single record using DataFrame joins and
> land into a relational database table(DB2).
>
> But we are getting the following error when we write into db using JDBC.
>
> “org.apache.spark.sql.AnalysisException: Inner join between two streaming
> DataFrames/Datasets is not supported;”
>
> Any help would be greatly appreciated.
>
> Thanks,
> Thomas Thomas
> Mastermind Solutions LLC.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Curious case of Spark SQL 2.3 - number of stages different for the same query ever?

2018-04-16 Thread Jacek Laskowski
Hi,

I've got a case where the same structured query (it's union) gives 1 stage
for a run and 5 stages for another. I could not find any pattern yet (and
it's hard to reproduce it due to the volume and the application), but I'm
pretty certain that it's *never* possible that Spark 2.3 could come up with
1 vs 5 stages for the very same query plan (even if I changed number of
executors or number of cores or anything execution-related).

So my question is, is this possible that Spark SQL could give 1-stage
execution plan and 5-stage execution plan for the very same query?

(I am not saying that I'm 100% sure that the query is indeed the same since
I'm working on a reproducible test case and only when I got it I'll really
be).

Sorry for the vague description, but I've got nothing more to share yet.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski


Re: Run spark 2.2 on yarn as usual java application

2018-03-19 Thread Jacek Laskowski
Hi,

What's the deployment process then (if not using spark-submit)? How is the
AM deployed? Why would you want to skip spark-submit?

Jacek

On 19 Mar 2018 00:20, "Serega Sheypak"  wrote:

> Hi, Is it even possible to run spark on yarn as usual java application?
> I've built jat using maven with spark-yarn dependency and I manually
> populate SparkConf with all hadoop properties.
> SparkContext fails to start with exception:
>
>1. Caused by: java.lang.IllegalStateException: Library directory
>'/hadoop/yarn/local/usercache/root/appcache/application_
>1521375636129_0022/container_e06_1521375636129_0022_01_
>02/assembly/target/scala-2.11/jars' does not exist; make sure Spark
>is built.
>2. at org.apache.spark.launcher.CommandBuilderUtils.checkState(Com
>mandBuilderUtils.java:260)
>3. at org.apache.spark.launcher.CommandBuilderUtils.findJarsDir(Co
>mmandBuilderUtils.java:359)
>4. at org.apache.spark.launcher.YarnCommandBuilderUtils$.findJarsDir(
>YarnCommandBuilderUtils.scala:38)
>
>
> I took a look at the code and it has some hardcodes and checks for
> specific files layout. I don't follow why :)
> Is it possible to bypass such checks?
>


Re: NPE in Subexpression Elimination optimization

2018-03-18 Thread Jacek Laskowski
Hi,

Filled https://issues.apache.org/jira/browse/SPARK-23731 and am working on
a workaround (aka fix).

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski

On Fri, Mar 16, 2018 at 3:56 PM, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi,
>
> I'm working on a minimal test to reproduce the NPE exception that is
> thrown in the latest 2.3.0 and earlier 2.2.1 in subexpression elimination
> optimization, and am sending it to the mailing list hoping someone notices
> something familiar and would shed more light on what might be the root
> cause and how to write a test.
>
> I know why Spark throws the NPE technically since there's this @transient
> relation: HadoopFsRelation [1] that is not re-created at de-serialization
> on executors, but don't know why this @transient is required in the first
> place and more importantly how to write a test.
>
> Any hints appreciated.
>
> FYI Disabling subexpression elimination with spark.sql.
> subexpressionElimination.enabled Spark configuration property helps.
>
> [1] https://github.com/apache/spark/blob/branch-2.3/sql/
> core/src/main/scala/org/apache/spark/sql/execution/
> DataSourceScanExec.scala?utf8=%E2%9C%93#L159
>
> Caused by: java.lang.NullPointerException
>   at org.apache.spark.sql.execution.FileSourceScanExec.<
> init>(DataSourceScanExec.scala:167)
>   at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(
> DataSourceScanExec.scala:502)
>   at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(
> DataSourceScanExec.scala:158)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$
> lzycompute(QueryPlan.scala:210)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(
> QueryPlan.scala:209)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.
> apply(QueryPlan.scala:224)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.
> apply(QueryPlan.scala:224)
>   at scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
>   at scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.immutable.List.map(List.scala:285)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan.
> doCanonicalize(QueryPlan.scala:224)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$
> lzycompute(QueryPlan.scala:210)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(
> QueryPlan.scala:209)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.
> apply(QueryPlan.scala:224)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.
> apply(QueryPlan.scala:224)
>   at scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
>   at scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.immutable.List.map(List.scala:285)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan.
> doCanonicalize(QueryPlan.scala:224)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$
> lzycompute(QueryPlan.scala:210)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(
> QueryPlan.scala:209)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.
> apply(QueryPlan.scala:224)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.
> apply(QueryPlan.scala:224)
>   at scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
>   at scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.immutable.List.map(List.scala:285)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan.
> doCanonicalize(QueryPlan.scala:224)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$
> lzycompute(QueryPlan.scala:210)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(
> QueryPlan.scala:209)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan.sameResult(
> QueryPlan.scala:257)
>   at org.apache.spark.sql.execution.ScalarSubquery.
> semanticEquals(subquery.sc

NPE in Subexpression Elimination optimization

2018-03-16 Thread Jacek Laskowski
)
  at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski


Re: Are there any alternatives to Hive "stored by" clause as Spark 2.0 does not support it

2018-02-08 Thread Jacek Laskowski
Hi,

Since I'm new to Hive, what does `stored by` do? I might help a bit in
Spark if I only knew a bit about Hive :)

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski

On Thu, Feb 8, 2018 at 7:25 AM, Pralabh Kumar <pralabhku...@gmail.com>
wrote:

> Hi
>
> Spark 2.0 doesn't support stored by . Is there any alternative to achieve
> the same.
>
>
>


Re: Apache Spark - Spark Structured Streaming - Watermark usage

2018-02-06 Thread Jacek Laskowski
Hi,

What would you expect? The data is simply dropped as that's the purpose of
watermarking it. That's my understanding at least.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski

On Mon, Feb 5, 2018 at 8:11 PM, M Singh <mans2si...@yahoo.com> wrote:

> Just checking if anyone has more details on how watermark works in cases
> where event time is earlier than processing time stamp.
>
>
> On Friday, February 2, 2018 8:47 AM, M Singh <mans2si...@yahoo.com> wrote:
>
>
> Hi Vishu/Jacek:
>
> Thanks for your responses.
>
> Jacek - At the moment, the current time for my use case is processing time.
>
> Vishnu - Spark documentation (https://spark.apache.org/
> docs/latest/structured-streaming-programming-guide.html) does indicate
> that it can dedup using watermark.  So I believe there are more use cases
> for watermark and that is what I am trying to find.
>
> I am hoping that TD can clarify or point me to the documentation.
>
> Thanks
>
>
> On Wednesday, January 31, 2018 6:37 AM, Vishnu Viswanath <
> vishnu.viswanat...@gmail.com> wrote:
>
>
> Hi Mans,
>
> Watermark is Spark is used to decide when to clear the state, so if the
> even it delayed more than when the state is cleared by Spark, then it will
> be ignored.
> I recently wrote a blog post on this : http://vishnuviswanath.com/
> spark_structured_streaming.html#watermark
>
> Yes, this State is applicable for aggregation only. If you are having only
> a map function and don't want to process it, you could do a filter based on
> its EventTime field, but I guess you will have to compare it with the
> processing time since there is no API to access Watermark by the user.
>
> -Vishnu
>
> On Fri, Jan 26, 2018 at 1:14 PM, M Singh <mans2si...@yahoo.com.invalid>
> wrote:
>
> Hi:
>
> I am trying to filter out records which are lagging behind (based on event
> time) by a certain amount of time.
>
> Is the watermark api applicable to this scenario (ie, filtering lagging
> records) or it is only applicable with aggregation ?  I could not get a
> clear understanding from the documentation which only refers to it's usage
> with aggregation.
>
> Thanks
>
> Mans
>
>
>
>
>
>
>


Re: spark job error

2018-01-30 Thread Jacek Laskowski
Hi,

Start with spark.executor.memory 2g. You may also
give spark.yarn.executor.memoryOverhead a try.

See https://spark.apache.org/docs/latest/configuration.html and
https://spark.apache.org/docs/latest/running-on-yarn.html for more in-depth
information.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski

On Tue, Jan 30, 2018 at 5:52 PM, shyla deshpande <deshpandesh...@gmail.com>
wrote:

> I am running Zeppelin on EMR. with the default settings.  I am getting the
> following error. Restarting the Zeppelin application fixes the problem.
>
> What default settings do I need to override that will help fix this error.
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 71 in stage 231.0 failed 4 times, most recent failure: Lost task 71.3 in
> stage 231.0 Reason: Container killed by YARN for exceeding memory limits.
> 1.4 GB of 1.4 GB physical memory used. Consider boosting
> spark.yarn.executor.memoryOverhead.
>
> Thanks
>
>


Re: Apache Spark - Spark Structured Streaming - Watermark usage

2018-01-26 Thread Jacek Laskowski
Hi,

I'm curious how would you do the requirement "by a certain amount of time"
without a watermark? How would you know what's current and compute the lag?
Let's forget about watermark for a moment and see if it pops up as an
inevitable feature :)

"I am trying to filter out records which are lagging behind (based on event
time) by a certain amount of time."

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski

On Fri, Jan 26, 2018 at 7:14 PM, M Singh <mans2si...@yahoo.com.invalid>
wrote:

> Hi:
>
> I am trying to filter out records which are lagging behind (based on event
> time) by a certain amount of time.
>
> Is the watermark api applicable to this scenario (ie, filtering lagging
> records) or it is only applicable with aggregation ?  I could not get a
> clear understanding from the documentation which only refers to it's usage
> with aggregation.
>
> Thanks
>
> Mans
>


Re: Best active groups, forums or contacts for Spark ?

2018-01-26 Thread Jacek Laskowski
Hi Esa,

I'd say https://stackoverflow.com/questions/tagged/apache-spark is where
many active sparkians hang out :)

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski

On Fri, Jan 26, 2018 at 12:15 PM, Esa Heikkinen <
esa.heikki...@student.tut.fi> wrote:

> Hi
>
>
>
> It is very often difficult to get answers of question about Spark in many
> forums.. Maybe they are inactive or my questions are too bad. I don’t know,
> but does anyone know good active groups, forums or contacts other like this
> ?
>
>
>
> Esa Heikkinen
>
>
>


Re: Inner join with the table itself

2018-01-15 Thread Jacek Laskowski
Hi Michael,

scala> spark.version
res0: String = 2.4.0-SNAPSHOT

scala> val r1 = spark.range(1)
r1: org.apache.spark.sql.Dataset[Long] = [id: bigint]

scala> r1.as("left").join(r1.as("right")).filter($"left.id" === $"right.id
").show
+---+---+
| id| id|
+---+---+
|  0|  0|
+---+---+

Am I missing something? When aliasing a table, use the identifier in column
refs (inside).


Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski

On Mon, Jan 15, 2018 at 3:26 PM, Michael Shtelma <mshte...@gmail.com> wrote:

> Hi Jacek & Gengliang,
>
> let's take a look at the following query:
>
> val pos = spark.read.parquet(prefix + "POSITION.parquet")
> pos.createOrReplaceTempView("POSITION")
> spark.sql("SELECT  POSITION.POSITION_ID  FROM POSITION POSITION JOIN
> POSITION POSITION1 ON POSITION.POSITION_ID0 = POSITION1.POSITION_ID
> ").collect()
>
> This query is working for me right now using spark 2.2.
>
> Now we can try implementing the same logic with DataFrame API:
>
> pos.join(pos, pos("POSITION_ID0")===pos("POSITION_ID")).collect()
>
> I am getting the following error:
>
> "Join condition is missing or trivial.
>
> Use the CROSS JOIN syntax to allow cartesian products between these
> relations.;"
>
> I have tried using alias function, but without success:
>
> val pos2 = pos.alias("P2")
> pos.join(pos2, pos("POSITION_ID0")===pos2("POSITION_ID")).collect()
>
> This also leads us to the same error.
> Am  I missing smth about the usage of alias?
>
> Now let's rename the columns:
>
> val pos3 = pos.toDF(pos.columns.map(_ + "_2"): _*)
> pos.join(pos3, pos("POSITION_ID0")===pos3("POSITION_ID_2")).collect()
>
> It works!
>
> There is one more really odd thing about all this: a colleague of mine
> has managed to get the same exception ("Join condition is missing or
> trivial") also using original SQL query, but I think he has been using
> empty tables.
>
> Thanks,
> Michael
>
>
> On Mon, Jan 15, 2018 at 11:27 AM, Gengliang Wang
> <gengliang.w...@databricks.com> wrote:
> > Hi Michael,
> >
> > You can use `Explain` to see how your query is optimized.
> > https://docs.databricks.com/spark/latest/spark-sql/
> language-manual/explain.html
> > I believe your query is an actual cross join, which is usually very slow
> in
> > execution.
> >
> > To get rid of this, you can set `spark.sql.crossJoin.enabled` as true.
> >
> >
> > 在 2018年1月15日,下午6:09,Jacek Laskowski <ja...@japila.pl> 写道:
> >
> > Hi Michael,
> >
> > -dev +user
> >
> > What's the query? How do you "fool spark"?
> >
> > Pozdrawiam,
> > Jacek Laskowski
> > 
> > https://about.me/JacekLaskowski
> > Mastering Spark SQL https://bit.ly/mastering-spark-sql
> > Spark Structured Streaming https://bit.ly/spark-structured-streaming
> > Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
> > Follow me at https://twitter.com/jaceklaskowski
> >
> > On Mon, Jan 15, 2018 at 10:23 AM, Michael Shtelma <mshte...@gmail.com>
> > wrote:
> >>
> >> Hi all,
> >>
> >> If I try joining the table with itself using join columns, I am
> >> getting the following error:
> >> "Join condition is missing or trivial. Use the CROSS JOIN syntax to
> >> allow cartesian products between these relations.;"
> >>
> >> This is not true, and my join is not trivial and is not a real cross
> >> join. I am providing join condition and expect to get maybe a couple
> >> of joined rows for each row in the original table.
> >>
> >> There is a workaround for this, which implies renaming all the columns
> >> in source data frame and only afterwards proceed with the join. This
> >> allows us to fool spark.
> >>
> >> Now I am wondering if there is a way to get rid of this problem in a
> >> better way? I do not like the idea of renaming the columns because
> >> this makes it really difficult to keep track of the names in the
> >> columns in result data frames.
> >> Is it possible to deactivate this check?
> >>
> >> Thanks,
> >> Michael
> >>
> >> -
> >> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> >>
> >
> >
>


Re: Inner join with the table itself

2018-01-15 Thread Jacek Laskowski
Hi Michael,

-dev +user

What's the query? How do you "fool spark"?

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski

On Mon, Jan 15, 2018 at 10:23 AM, Michael Shtelma <mshte...@gmail.com>
wrote:

> Hi all,
>
> If I try joining the table with itself using join columns, I am
> getting the following error:
> "Join condition is missing or trivial. Use the CROSS JOIN syntax to
> allow cartesian products between these relations.;"
>
> This is not true, and my join is not trivial and is not a real cross
> join. I am providing join condition and expect to get maybe a couple
> of joined rows for each row in the original table.
>
> There is a workaround for this, which implies renaming all the columns
> in source data frame and only afterwards proceed with the join. This
> allows us to fool spark.
>
> Now I am wondering if there is a way to get rid of this problem in a
> better way? I do not like the idea of renaming the columns because
> this makes it really difficult to keep track of the names in the
> columns in result data frames.
> Is it possible to deactivate this check?
>
> Thanks,
> Michael
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: Apache Spark - Question about Structured Streaming Sink addBatch dataframe size

2018-01-04 Thread Jacek Laskowski
Hi,

> If the data is very large then a collect may result in OOM.

That's a general case even in any part of Spark, incl. Spark Structured
Streaming. Why would you collect in addBatch? It's on the driver side and
as anything on the driver, it's a single JVM (and usually not fault
tolerant)

> Do you have any other suggestion/recommendation ?

What's wrong with the current solution? I don't think you should change how
you do things currently. You should just avoid collect on large datasets
(which you have to do anywhere in Spark).

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski

On Thu, Jan 4, 2018 at 10:49 PM, M Singh <mans2si...@yahoo.com.invalid>
wrote:

> Thanks Tathagata for your answer.
>
> The reason I was asking about controlling data size is that the javadoc
> indicate you can use foreach or collect on the dataframe.  If the data is
> very large then a collect may result in OOM.
>
> From your answer it appears that the only way to control the size (in 2.2)
> would be control the trigger interval. However, in my case, I have to dedup
> the elements in one minute interval, which I am using a trigger interval
> and cannot reduce it.  Do you have any other suggestion/recommendation ?
>
> Also, do you have any timeline for the availability of DataSourceV2/Spark
> 2.3 ?
>
> Thanks again.
>
>
> On Wednesday, January 3, 2018 2:27 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>
> 1. It is all the result data in that trigger. Note that it takes a
> DataFrame which is a purely logical representation of data and has no
> association with partitions, etc. which are physical representations.
>
> 2. If you want to limit the amount of data that is processed in a trigger,
> then you should either control the trigger interval or use the rate limit
> options on sources that support it (e.g. for kafka, you can use the option
> "maxOffsetsPerTrigger", see the guide
> <https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html>
> ).
>
> Related note, these APIs are subject to change. In fact in the upcoming
> release 2.3, we are adding a DataSource V2 API for
> batch/microbatch-streaming/continuous-streaming sources and sinks.
>
> On Wed, Jan 3, 2018 at 11:23 PM, M Singh <mans2si...@yahoo.com.invalid>
> wrote:
>
> Hi:
>
> The documentation for Sink.addBatch is as follows:
>
>   /**
>* Adds a batch of data to this sink. The data for a given `batchId` is
> deterministic and if
>* this method is called more than once with the same batchId (which
> will happen in the case of
>* failures), then `data` should only be added once.
>*
>* Note 1: You cannot apply any operators on `data` except consuming it
> (e.g., `collect/foreach`).
>* Otherwise, you may get a wrong result.
>*
>* Note 2: The method is supposed to be executed synchronously, i.e.
> the method should only return
>* after data is consumed by sink successfully.
>*/
>   def addBatch(batchId: Long, data: DataFrame): Unit
>
> A few questions about the data is each DataFrame passed as the argument to
> addBatch -
> 1. Is it all the data in a partition for each trigger or is it all the
> data in that trigger ?
> 2. Is there a way to control the size in each addBatch invocation to make
> sure that we don't run into OOM exception on the executor while calling
> collect ?
>
> Thanks
>
>
>
>
>


Re: Is spark-env.sh sourced by Application Master and Executor for Spark on YARN?

2018-01-03 Thread Jacek Laskowski
Hi,

My understanding is that AM with the driver (in cluster deploy mode) and
executors are simple Java processes with their settings set one by one
while submitting a Spark application for execution and creating
ContainerLaunchContext for launching YARN containers. See
https://github.com/apache/spark/blob/master/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala?utf8=%E2%9C%93#L796-L801
for the code that does the settings to properties mapping.

With that I think conf/spark-defaults.conf won't be loaded by itself.

Why don't you set a property and see if it's available on the driver in
cluster deploy mode? That should give you a definitive answer (or at least
get you closer).

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski

On Wed, Jan 3, 2018 at 7:57 AM, John Zhuge <jzh...@apache.org> wrote:

> Hi,
>
> I am running Spark 2.0.0 and 2.1.1 on YARN in a Hadoop 2.7.3 cluster. Is
> spark-env.sh sourced when starting the Spark AM container or the executor
> container?
>
> Saw this paragraph on https://github.com/apache/spark/blob/master/docs/
> configuration.md:
>
> Note: When running Spark on YARN in cluster mode, environment variables
>> need to be set using the spark.yarn.appMasterEnv.[
>> EnvironmentVariableName] property in your conf/spark-defaults.conf file.
>> Environment variables that are set in spark-env.sh will not be reflected
>> in the YARN Application Master process in clustermode. See the YARN-related
>> Spark Properties
>> <https://github.com/apache/spark/blob/master/docs/running-on-yarn.html#spark-properties>
>>  for
>> more information.
>
>
> Does it mean spark-env.sh will not be sourced when starting AM in cluster
> mode?
> Does this paragraph appy to executor as well?
>
> Thanks,
> --
> John Zhuge
>


Re: How to...UNION ALL of two SELECTs over different data sources in parallel?

2017-12-17 Thread Jacek Laskowski
Thanks Silvio!

In the meantime, with help of Adam and code review of WholeStageCodegenExec
and CollapseCodegenStages, I found out that anything that's codegend is as
fast as the tasks in a stage. In this case, union of two codegend subtrees
is indeed parallel.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

On Sat, Dec 16, 2017 at 7:12 PM, Silvio Fiorito <
silvio.fior...@granturing.com> wrote:

> Hi Jacek,
>
>
>
> Just replied to the SO thread as well, but…
>
>
>
> Yes, your first statement is correct. The DFs in the union are read in the
> same stage, so in your example where each DF has 8 partitions then you have
> a stage with 16 tasks to read the 2 DFs. There's no need to define the DF
> in a separate thread. You can verify this also in the Stage UI and looking
> at the Event Timeline. You should see the tasks across the DFs executing in
> parallel as expected.
>
>
>
> Here’s the UI for the following example, in which case each DF only has 1
> partition (so we get a stage with 2 tasks):
>
>
>
> spark.range(1, 100, 1, 1).write.save("/tmp/df1")
>
> spark.range(101, 200, 1, 1).write.save("/tmp/df2")
>
>
>
> spark.read.load("/tmp/df1").union(spark.read.load("/tmp/df2")).foreach {
> _ => }
>
>
>
>
>
> *From: *Jacek Laskowski <ja...@japila.pl>
> *Date: *Saturday, December 16, 2017 at 6:40 AM
> *To: *"user @spark" <user@spark.apache.org>
> *Subject: *How to...UNION ALL of two SELECTs over different data sources
> in parallel?
>
>
>
> Hi,
>
>
>
> I've been trying to find out the answer to the question about UNION ALL
> and SELECTs @ https://stackoverflow.com/q/47837955/1305344
>
>
>
> > If I have Spark SQL statement of the form SELECT [...] UNION ALL SELECT
> [...], will the two SELECT statements be executed in parallel? In my
> specific use case the two SELECTs are querying two different database
> tables. In contrast to what I would have expected, the Spark UI seems to
> suggest that the two SELECT statements are performed sequentially.
>
>
>
> How to know if the two separate SELECTs are executed in parallel or not?
> What are the tools to know it?
>
>
>
> My answer was to use explain operator that would show...well...physical
> plan, but am not sure how to read it to know whether a query plan is going
> to be executed in parallel or not.
>
>
>
> I then used the underlying RDD lineage (using rdd.toDebugString) hoping
> that gives me the answer, but...I'm not so sure.
>
>
>
> For a query like the following:
>
>
>
> val q = spark.range(1).union(spark.range(2))
>
>
>
> I thought that since both SELECTs are codegen'ed they could be executed in
> parallel, but when switched to the RDD lineage I lost my confidence given
> there's just one single stage (!)
>
>
>
> scala> q.rdd.toDebugString
>
> res4: String =
>
> (16) MapPartitionsRDD[17] at rdd at :26 []
>
>  |   MapPartitionsRDD[16] at rdd at :26 []
>
>  |   UnionRDD[15] at rdd at :26 []
>
>  |   MapPartitionsRDD[11] at rdd at :26 []
>
>  |   MapPartitionsRDD[10] at rdd at :26 []
>
>  |   ParallelCollectionRDD[9] at rdd at :26 []
>
>  |   MapPartitionsRDD[14] at rdd at :26 []
>
>  |   MapPartitionsRDD[13] at rdd at :26 []
>
>  |   ParallelCollectionRDD[12] at rdd at :26 []
>
>
>
> What am I missing and how to be certain whether and what parts of a query
> are going to be executed in parallel?
>
>
>
> Please help...
>
>
>
> Pozdrawiam,
>
> Jacek Laskowski
>
> 
>
> https://about.me/JacekLaskowski
>
> Spark Structured Streaming https://bit.ly/spark-structured-streaming
>
> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
>
> Follow me at https://twitter.com/jaceklaskowski
>


How to...UNION ALL of two SELECTs over different data sources in parallel?

2017-12-16 Thread Jacek Laskowski
Hi,

I've been trying to find out the answer to the question about UNION ALL and
SELECTs @ https://stackoverflow.com/q/47837955/1305344

> If I have Spark SQL statement of the form SELECT [...] UNION ALL SELECT
[...], will the two SELECT statements be executed in parallel? In my
specific use case the two SELECTs are querying two different database
tables. In contrast to what I would have expected, the Spark UI seems to
suggest that the two SELECT statements are performed sequentially.

How to know if the two separate SELECTs are executed in parallel or not?
What are the tools to know it?

My answer was to use explain operator that would show...well...physical
plan, but am not sure how to read it to know whether a query plan is going
to be executed in parallel or not.

I then used the underlying RDD lineage (using rdd.toDebugString) hoping
that gives me the answer, but...I'm not so sure.

For a query like the following:

val q = spark.range(1).union(spark.range(2))

I thought that since both SELECTs are codegen'ed they could be executed in
parallel, but when switched to the RDD lineage I lost my confidence given
there's just one single stage (!)

scala> q.rdd.toDebugString
res4: String =
(16) MapPartitionsRDD[17] at rdd at :26 []
 |   MapPartitionsRDD[16] at rdd at :26 []
 |   UnionRDD[15] at rdd at :26 []
 |   MapPartitionsRDD[11] at rdd at :26 []
 |   MapPartitionsRDD[10] at rdd at :26 []
 |   ParallelCollectionRDD[9] at rdd at :26 []
 |   MapPartitionsRDD[14] at rdd at :26 []
 |   MapPartitionsRDD[13] at rdd at :26 []
 |   ParallelCollectionRDD[12] at rdd at :26 []

What am I missing and how to be certain whether and what parts of a query
are going to be executed in parallel?

Please help...

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


Re: is Union or Join Supported for Spark Structured Streaming Queries in 2.2.0?

2017-12-16 Thread Jacek Laskowski
Hi,

join between streaming and batch/static Datasets is supported for sure -->
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#join-operations

I'm not sure about union, but that's just easy to check (and am leaving it
as your home exercise).

You cannot have datasets of different schema in a query. You'd have to use
the most wide schema to cover all schemas.

p.s. Have you tried anything...spark-shell's your friend, my friend :)

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

On Wed, Dec 13, 2017 at 11:16 PM, kant kodali <kanth...@gmail.com> wrote:

> Hi All,
>
> I have messages in a queue that might be coming in with few different
> schemas like
> msg 1 schema 1, msg2 schema2, msg3 schema3, msg 4 schema1
>
> I want to put all of this in one data frame. is it possible with
> structured streaming?
>
> I am using Spark 2.2.0
>
> Thanks!
>
>


Re: Why Spark 2.2.1 still bundles old Hive jars?

2017-12-11 Thread Jacek Laskowski
Hi,

https://issues.apache.org/jira/browse/SPARK-19076

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

On Mon, Dec 11, 2017 at 7:43 AM, An Qin <a...@qilinsoft.com> wrote:

> Hi, all,
>
>
>
> I want to include Sentry 2.0.0 in my Spark project. However it bundles
> Hive 2.3.2. I find the newest Spark 2.2.1 still bundles old Hive jars, for
> example, hive-exec-1.2.1.spark2.jar. Why does it upgrade to the new Hive?
> Are they compatible?
>
>
>
> Regards,
>
>
>
>
>
> Qin An.
>
>
>
>
>
>
>


Re: Infer JSON schema in structured streaming Kafka.

2017-12-11 Thread Jacek Laskowski
Hi,

What about a custom streaming Sink that would stop the query after addBatch
has been called?

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

On Mon, Dec 11, 2017 at 9:15 AM, satyajit vegesna <
satyajit.apas...@gmail.com> wrote:

> Hi Jacek,
>
> For now , i am using Thread.sleep() on driver, to make sure my streaming
> query receives some data and and stop it, before the control reaches
> querying memory table.
> Let me know if there is any better way of handling it.
>
> Regards,
> Satyajit.
>
> On Sun, Dec 10, 2017 at 10:43 PM, satyajit vegesna <
> satyajit.apas...@gmail.com> wrote:
>
>> Hi Jacek,
>>
>> Thank you for responding back,
>>
>> i have tried memory sink, and below is what i did
>>
>>  val fetchValue = debeziumRecords.selectExpr("value").withColumn("tableName",
>> functions.get_json_object($"value".cast(StringType), "$.schema.name"))
>> .withColumn("operation", 
>> functions.get_json_object($"value".cast(StringType),
>> "$.payload.op"))
>> .withColumn("payloadAfterValue", 
>> split(substring_index(debeziumRecords("value"),
>> "\"after\":" ,-1),",\"source\"").getItem(0))
>> .drop("tableName").drop("operation").drop("value").as[String].writeStream
>>
>> .outputMode(OutputMode.Append())
>> .queryName("record")
>> .format("memory")
>> .start()
>>
>> spark.sql("select * from record").show(truncate = false) //i was
>> expecting to be able to use the record table to read the JSON string, but
>> the table is empty for the first call. And i do not see any dataframe
>> output after the first one
>>
>> *But yeah the above steps work good and i can do things that i need to,
>> in spark-shell, the problem is when i try to code in Intellij, because the
>> streaming query keeps running and i am not sure how to identify and stop
>> the streaming query and use record memory table.*
>>
>> So i would like to stop the streaming query once i know i have some data
>> in my record memory table(is there a way to do that), so i can stop the
>> streaming query and use the memory table, fetch my record.
>> Any help on how to approach the situation programmatically/any examples
>> pointed would highly be appreciated.
>>
>> Regards,
>> Satyajit.
>>
>>
>>
>> On Sun, Dec 10, 2017 at 9:52 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>>
>>> Hi,
>>>
>>> What about memory sink? That could work.
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> 
>>> https://about.me/JacekLaskowski
>>> Spark Structured Streaming https://bit.ly/spark-structured-streaming
>>> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
>>> Follow me at https://twitter.com/jaceklaskowski
>>>
>>> On Mon, Dec 11, 2017 at 3:28 AM, satyajit vegesna <
>>> satyajit.apas...@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I would like to infer JSON schema from a sample of data that i receive
>>>> from, Kafka Streams(specific topic), and i have to infer the schema as i am
>>>> going to receive random JSON string with different schema for each topic,
>>>> so i chose to go ahead with below steps,
>>>>
>>>> a. readStream from Kafka(latest offset), from a single Kafka topic.
>>>> b. Some how to store the JSON string into val and infer the schema.
>>>> c. stop the stream.
>>>> d.Create new readStream(smallest offset) and use the above inferred
>>>> schema to process the JSON using spark provided JSON support, like
>>>> from_json, json_object and others and run my actuall business logic.
>>>>
>>>> Now i am not sure how to be successful with step(b). Any help would be
>>>> appreciated.
>>>> And would also like to know if there is any better approach.
>>>>
>>>> Regards,
>>>> Satyajit.
>>>>
>>>
>>>
>>
>


Re: Infer JSON schema in structured streaming Kafka.

2017-12-10 Thread Jacek Laskowski
Hi,

What about memory sink? That could work.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

On Mon, Dec 11, 2017 at 3:28 AM, satyajit vegesna <
satyajit.apas...@gmail.com> wrote:

> Hi All,
>
> I would like to infer JSON schema from a sample of data that i receive
> from, Kafka Streams(specific topic), and i have to infer the schema as i am
> going to receive random JSON string with different schema for each topic,
> so i chose to go ahead with below steps,
>
> a. readStream from Kafka(latest offset), from a single Kafka topic.
> b. Some how to store the JSON string into val and infer the schema.
> c. stop the stream.
> d.Create new readStream(smallest offset) and use the above inferred schema
> to process the JSON using spark provided JSON support, like from_json,
> json_object and others and run my actuall business logic.
>
> Now i am not sure how to be successful with step(b). Any help would be
> appreciated.
> And would also like to know if there is any better approach.
>
> Regards,
> Satyajit.
>


Re: pyspark + from_json(col("col_name"), schema) returns all null

2017-12-10 Thread Jacek Laskowski
Hi,

Not that I'm aware of, but in your case checking out whether a JSON message
fit your schema and the pipeline would've taken pyspark alone with JSONs on
disk, wouldn't it?

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

On Mon, Dec 11, 2017 at 12:49 AM, salemi <alireza.sal...@udo.edu> wrote:

> I found the root cause! There was mismatch between the StructField type and
> the json message.
>
>
> Is there a good write up / wiki out there that describes how to debug spark
> jobs?
>
>
> Thanks
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: RDD[internalRow] -> DataSet

2017-12-09 Thread Jacek Laskowski
Hi Satyajit,

That's exactly what Dataset.rdd does -->
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala?utf8=%E2%9C%93#L2916-L2921

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

On Fri, Dec 8, 2017 at 5:25 AM, satyajit vegesna <satyajit.apas...@gmail.com
> wrote:

> Hi All,
>
> Is there a way to convert RDD[internalRow] to Dataset , from outside spark
> sql package.
>
> Regards,
> Satyajit.
>


Re: Struct Type

2017-11-17 Thread Jacek Laskowski
Hi,

Use explode function, filter operator and collect_list function.

Or "heavier" flatMap.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

On Fri, Nov 17, 2017 at 6:06 PM, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> Hi,
>
> I have following schema in dataframe and I want to extract key which
> matches as MaxSpeed from the array and it's corresponding value of the key.
>
> |-- tags: array (nullable = true)
>  ||-- element: struct (containsNull = true)
>  |||-- key: string (nullable = true)
>  |||-- value: string (nullable = true)
>
> is there any way to achieve it in dataframe?
>
> Thanks,
> Asmath
>


Re: Restart Spark Streaming after deployment

2017-11-16 Thread Jacek Laskowski
Hi,

You're right...killing the spark streaming job is the way to go. If a batch
was completed successfully, Spark Streaming will recover from the
controlled failure and start where it left off. I don't think there's other
way to do it.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

On Wed, Nov 15, 2017 at 5:18 PM, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> Hi,
>
> I am new in the usage of spark streaming. I have developed one spark
> streaming job which runs every 30 minutes with checkpointing directory.
>
> I have to implement minor change, shall I kill the spark streaming job
> once the batch is completed using yarn application -kill command and update
> the jar file?
>
> Question I have is, if I follow the above approach will spark streaming
> picks up data from offset saved in checkpoint after restart?
>
> is there any other better approaches you have. Thanks in advance for your
> suggestions.
>
> Thanks,
> Asmath
>


Re: Structured Streaming and Hive

2017-09-30 Thread Jacek Laskowski
Hi,

Guessing it's a timing issue. Once you started the query the batch 0 did
not have rows to save or didn't start yet (it's a separate thread) and so
spark.sql ran once and saved nothing.

You should rather use foreach writer to save results to Hive.

Jacek

On 29 Sep 2017 11:36 am, "HanPan"  wrote:

> Hi guys,
>
>
>
>  I’m new to spark structured streaming. I’m using 2.1.0 and my
> scenario is reading specific topic from kafka and do some data mining
> tasks, then save the result dataset to hive.
>
>  While writing data to hive, somehow it seems like not supported yet
> and I tried this:
>
>It runs ok, but no result in hive.
>
>
>
>Any idea writing the stream result to hive?
>
>
>
> Thanks
>
> Pan
>
>
>
>
>


Re: How to read from multiple kafka topics using structured streaming (spark 2.2.0)?

2017-09-19 Thread Jacek Laskowski
Hi,

Cody's right.

subscribe - Topic subscription strategy that accepts topic names as a
comma-separated string, e.g. topic1,topic2,topic3 [1]

subscribepattern - Topic subscription strategy that uses Java’s
java.util.regex.Pattern for the topic subscription regex pattern of topics
to subscribe to, e.g. topic\d [2]

[1]
https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-KafkaSource.html#subscribe
[2]
https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-KafkaSource.html#subscribepattern

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Spark Structured Streaming (Apache Spark 2.2+)
https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

On Tue, Sep 19, 2017 at 10:34 PM, Cody Koeninger <c...@koeninger.org> wrote:

> You should be able to pass a comma separated string of topics to
> subscribe.  subscribePattern isn't necessary
>
>
>
> On Tue, Sep 19, 2017 at 2:54 PM, kant kodali <kanth...@gmail.com> wrote:
> > got it! Sorry.
> >
> > On Tue, Sep 19, 2017 at 12:52 PM, Jacek Laskowski <ja...@japila.pl>
> wrote:
> >>
> >> Hi,
> >>
> >> Use subscribepattern
> >>
> >> You haven't googled well enough -->
> >> https://jaceklaskowski.gitbooks.io/spark-structured-
> streaming/spark-sql-streaming-KafkaSource.html
> >> :)
> >>
> >> Pozdrawiam,
> >> Jacek Laskowski
> >> 
> >> https://about.me/JacekLaskowski
> >> Spark Structured Streaming (Apache Spark 2.2+)
> >> https://bit.ly/spark-structured-streaming
> >> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
> >> Follow me at https://twitter.com/jaceklaskowski
> >>
> >> On Tue, Sep 19, 2017 at 9:50 PM, kant kodali <kanth...@gmail.com>
> wrote:
> >>>
> >>> HI All,
> >>>
> >>> I am wondering How to read from multiple kafka topics using structured
> >>> streaming (code below)? I googled prior to asking this question and I
> see
> >>> responses related to Dstreams but not structured streams. Is it
> possible to
> >>> read multiple topics using the same spark structured stream?
> >>>
> >>> sparkSession.readStream()
> >>> .format("kafka")
> >>> .option("kafka.bootstrap.servers", "localhost:9092")
> >>> .option("subscribe", "hello1")
> >>> .option("startingOffsets", "earliest")
> >>> .option("failOnDataLoss", "false")
> >>> .load();
> >>>
> >>>
> >>> Thanks!
> >>
> >>
> >
>


Re: Structured streaming coding question

2017-09-19 Thread Jacek Laskowski
Hi,

Ah, right! Start the queries and once they're running, awaitTermination
them.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Spark Structured Streaming (Apache Spark 2.2+)
https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

On Wed, Sep 20, 2017 at 7:09 AM, kant kodali <kanth...@gmail.com> wrote:

> Looks like my problem was the order of awaitTermination() for some reason.
>
> Doesn't work
>
>
>
>
>
> On Tue, Sep 19, 2017 at 1:54 PM, kant kodali <kanth...@gmail.com> wrote:
>
>> Hi All,
>>
>> I have the following Psuedo code (I could paste the real code however it
>> is pretty long and involves Database calls inside dataset.map operation and
>> so on) so I am just trying to simplify my question. would like to know if
>> there is something wrong with the following pseudo code?
>>
>> DataSet inputDS = readFromKaka(topicName)
>>
>> DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works
>> Since I can see data getting populated
>>
>> DataSet outputDS1 = mongoDS.map(readFromDatabase); // Works as
>> well
>>
>> DataSet outputDS2 = mongoDS.map( readFromDatabase); // Doesn't
>> work
>>
>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>> KafkaSink("hello1")).start().awaitTermination()
>>
>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>> KafkaSink("hello2")).start().awaitTermination()
>>
>>
>> *So what's happening with above code is that I can see data coming out of
>> hello1 topic but not from hello2 topic.* I thought there is something
>> wrong with "outputDS2" so I switched the order  so now the code looks like
>> this
>>
>> DataSet inputDS = readFromKaka(topicName)
>>
>> DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works
>> Since I can see data getting populated
>>
>> DataSet outputDS2 = mongoDS.map( readFromDatabase); // This Works
>>
>> DataSet outputDS1 = mongoDS.map(readFromDatabase); // Desn't work
>>
>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>> KafkaSink("hello1")).start().awaitTermination()
>>
>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>> KafkaSink("hello2")).start().awaitTermination()
>>
>> *Now I can see data coming out from hello2 kafka topic but not from
>> hello1 topic*. *In  short, I can only see data from outputDS1 or
>> outputDS2 but not both. * At this point I am not sure what is going on?
>>
>> Thanks!
>>
>>
>>
>


Re: Structured streaming coding question

2017-09-19 Thread Jacek Laskowski
Hi,

What's the code in readFromKafka to read from hello2 and hello1?

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Spark Structured Streaming (Apache Spark 2.2+)
https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

On Tue, Sep 19, 2017 at 10:54 PM, kant kodali <kanth...@gmail.com> wrote:

> Hi All,
>
> I have the following Psuedo code (I could paste the real code however it
> is pretty long and involves Database calls inside dataset.map operation and
> so on) so I am just trying to simplify my question. would like to know if
> there is something wrong with the following pseudo code?
>
> DataSet inputDS = readFromKaka(topicName)
>
> DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works Since
> I can see data getting populated
>
> DataSet outputDS1 = mongoDS.map(readFromDatabase); // Works as well
>
> DataSet outputDS2 = mongoDS.map( readFromDatabase); // Doesn't work
>
> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
> KafkaSink("hello1")).start().awaitTermination()
>
> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
> KafkaSink("hello2")).start().awaitTermination()
>
>
> *So what's happening with above code is that I can see data coming out of
> hello1 topic but not from hello2 topic.* I thought there is something
> wrong with "outputDS2" so I switched the order  so now the code looks like
> this
>
> DataSet inputDS = readFromKaka(topicName)
>
> DataSet mongoDS = inputDS.map(insertIntoDatabase); // Works Since
> I can see data getting populated
>
> DataSet outputDS2 = mongoDS.map( readFromDatabase); // This Works
>
> DataSet outputDS1 = mongoDS.map(readFromDatabase); // Desn't work
>
> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
> KafkaSink("hello1")).start().awaitTermination()
>
> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
> KafkaSink("hello2")).start().awaitTermination()
>
> *Now I can see data coming out from hello2 kafka topic but not from hello1
> topic*. *In  short, I can only see data from outputDS1 or outputDS2 but
> not both. * At this point I am not sure what is going on?
>
> Thanks!
>
>
>


  1   2   3   4   5   >