Re: Jdbc Hook in Spark Batch Application

2020-12-24 Thread lec ssmi
Thanks.
But there is a problem that the classes referenced in the code need to be
modified. I want to try not to change the existing code.

Gabor Somogyi  于2020年12月25日周五 上午12:16写道:

> One can wrap the JDBC driver and such a way eveything can be sniffed.
>
> On Thu, 24 Dec 2020, 03:51 lec ssmi,  wrote:
>
>> Hi:
>>guys,  I have some spark programs that have database connection
>> operations.  I want to acquire the connection information, such as jdbc
>> connection properties ,  but not too intrusive to the code.
>>   Any good ideas ? Can java agent make it ?
>>
>>
>


Jdbc Hook in Spark Batch Application

2020-12-23 Thread lec ssmi
Hi:
   guys,  I have some spark programs that have database connection
operations.  I want to acquire the connection information, such as jdbc
connection properties ,  but not too intrusive to the code.
  Any good ideas ? Can java agent make it ?


Re: [Spark Structured Streaming] Not working while worker node is on different machine

2020-12-23 Thread lec ssmi
Any more detail about it ?

bannya  于2020年12月18日周五 上午11:25写道:

> Hi,
>
> I have a spark structured streaming application that is reading data from a
> Kafka topic (16 partitions). I am using standalone mode. I have two workers
> node, one node is on the same machine with masters and another one is on a
> different machine. Both of the worker nodes has 8 cores and 16G RAM with
> one
> executor.
>
> While I run the streaming application with one worker node which is on the
> same machine as the master, the application is working fine. But while I am
> running the application with two worker nodes, 8 tasks successfully
> completed running on worker node 1 (which is on the same machine as
> masters), but the other 8 tasks are scheduled on another worker node but
> it's got stuck in the RUNNING stage and application got stuck.
>
> The normal spark application is running fine with this setup.
>
> Can anyone help me with this?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Printing Logs in map-partition

2020-12-22 Thread lec ssmi
the logs  printed in the map function exist  in the worker node, you can
access it   directly, or you can  browse through webui.

abby37  于2020年12月23日周三 下午1:53写道:

> I want to print some logs in transformation mapPartitions  to logs the
> internal working of function.
> I have used following techniques without any success.
> 1. System.out.println()
> 2. System.err.println()
> 3. Log4j - logger.info
> 4. Log4j - logger.debug
>
> My code for mapPartitions is similar to  this
> <
> https://github.com/broadinstitute/gatk/blob/master/src/main/java/org/broadinstitute/hellbender/tools/spark/bwa/BwaSparkEngine.java#L103>
>
> .
>
> Is there any way to print logs on console in mapPartitions. Thanks for your
> time and helping me in advance.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: mysql connector java issue

2020-12-10 Thread lec ssmi
If you can not assembly the jdbc driver jar in your application jar
package, you can put the jdbc driver jar in the spark classpath, generally,
$SPARK_HOME/jars  or $SPARK_HOME/lib.


Artemis User  于2020年12月11日周五 上午5:21写道:

> What happened was that you made the mysql jar file only available to the
> spark driver, not the executors.  Use the --jars parameter instead of
> driver-class-path to specify your third-party jar files, or copy the
> third-party jar files to the jars directory for Spark in your HDFS, and
> specify the path of HDFS using --archives in spark-submit.
>
> -- ND
> On 12/10/20 10:02 AM, ismail elhammoud wrote:
>
> Hello,
>
> Guys I have an issue with mysql connector java, even if I declared it in
> sbt file It couldn't work if I don't give the whole path
>
> spark-submit --master yarn --driver-class-path
> /home/node2/Téléchargements/mysql-connector-java-5.1.24-bin.jar
> ./Sdatahub-assembly-0.1.jar
>
>
> Regards,
> Isma
>
>


Re: Using two WriteStreams in same spark structured streaming job

2020-11-04 Thread lec ssmi
you can use *foreach* sink to  achieve the logic you want.

act_coder  于2020年11月4日周三 下午9:56写道:

> I have a scenario where I would like to save the same streaming dataframe
> to
> two different streaming sinks.
>
> I have created a streaming dataframe which I need to send to both Kafka
> topic and delta lake.
>
> I thought of using forEachBatch, but looks like it doesn't support multiple
> STREAMING SINKS.
>
> Also, I tried using spark session.awaitAnyTermination() with multiple write
> streams. But the second stream is not getting processed !
>
> Is there a way through which we can achieve this ?!
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: MongoDB plugin to Spark - too many open cursors

2020-10-25 Thread lec ssmi
Is the connection pool configured by mongodb full?

Daniel Stojanov  于2020年10月26日周一 上午10:28写道:

> Hi,
>
>
> I receive an error message from the MongoDB server if there are too many
> Spark applications trying to access the database at the same time (about
> 3 or 4), "Cannot open a new cursor since too many cursors are already
> opened." I am not too sure of how to remedy this. I am not sure how the
> plugin behaves when it's pulling data.
>
> It appears that a given running application will open many connections
> to the database. The total number of cursors in the database's setting
> is many more than the number of read operations occurring in Spark.
>
>
> Does the plugin keep a connection/cursor open to the database even after
> it has pulled out the data into a dataframe?
>
> Why are there so many open cursors for a single read operation?
>
> Does catching the exception, sleeping for a while, then trying again
> make sense? If cursors are kept open throughout the life of the
> application this would not make sense.
>
>
> Plugin version: org.mongodb.spark:mongo-spark-connector_2.12:2.4.1
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Structured streaming - Kakfa - slowness with query 0

2020-10-20 Thread lec ssmi
Structured streaming's  bottom layer also uses a micro-batch
mechanism. It seems that the first batch is slower than  the latter, I also
often encounter this problem. It feels related to the division of batches.
   Other the other hand, spark's batch size is usually bigger than flume
transaction bache size.


KhajaAsmath Mohammed  于2020年10月21日周三 下午12:19写道:

> Yes. Changing back to latest worked but I still see the slowness compared
> to flume.
>
> Sent from my iPhone
>
> On Oct 20, 2020, at 10:21 PM, lec ssmi  wrote:
>
> 
> Do you start your application  with  chasing the early Kafka data  ?
>
> Lalwani, Jayesh  于2020年10月21日周三 上午2:19写道:
>
>> Are you getting any output? Streaming jobs typically run forever, and
>> keep processing data as it comes in the input. If a streaming job is
>> working well, it will typically generate output at a certain cadence
>>
>>
>>
>> *From: *KhajaAsmath Mohammed 
>> *Date: *Tuesday, October 20, 2020 at 1:23 PM
>> *To: *"user @spark" 
>> *Subject: *[EXTERNAL] Spark Structured streaming - Kakfa - slowness with
>> query 0
>>
>>
>>
>> *CAUTION*: This email originated from outside of the organization. Do
>> not click links or open attachments unless you can confirm the sender and
>> know the content is safe.
>>
>>
>>
>> Hi,
>>
>>
>>
>> I have started using spark structured streaming for reading data from
>> kaka and the job is very slow. Number of output rows keeps increasing in
>> query 0 and the job is running forever. any suggestions for this please?
>>
>>
>>
>> 
>>
>>
>>
>> Thanks,
>>
>> Asmath
>>
>


Re: Spark Structured streaming - Kakfa - slowness with query 0

2020-10-20 Thread lec ssmi
Do you start your application  with  chasing the early Kafka data  ?

Lalwani, Jayesh  于2020年10月21日周三 上午2:19写道:

> Are you getting any output? Streaming jobs typically run forever, and keep
> processing data as it comes in the input. If a streaming job is working
> well, it will typically generate output at a certain cadence
>
>
>
> *From: *KhajaAsmath Mohammed 
> *Date: *Tuesday, October 20, 2020 at 1:23 PM
> *To: *"user @spark" 
> *Subject: *[EXTERNAL] Spark Structured streaming - Kakfa - slowness with
> query 0
>
>
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> Hi,
>
>
>
> I have started using spark structured streaming for reading data from kaka
> and the job is very slow. Number of output rows keeps increasing in query 0
> and the job is running forever. any suggestions for this please?
>
>
>
>
>
> Thanks,
>
> Asmath
>


Re: how to disable replace HDFS checkpoint location in structured streaming in spark3.0.1

2020-10-13 Thread lec ssmi
sorry, the mail title  is a little problematic. "How to disable or
replace .."

lec ssmi  于2020年10月14日周三 上午9:27写道:

> I have written a demo using spark3.0.0, and the location where the
> checkpoint file  is saved has been explicitly specified   like
>>
>> stream.option("checkpointLocation","file:///C:\\Users\\Administrator
>> \\Desktop\\test")
>
> But the app still throws an   exception about the HDFS file system.
> Is it not possible to specify the local file system as a checkpoint
> location now?
>


how to disable replace HDFS checkpoint location in structured streaming in spark3.0.1

2020-10-13 Thread lec ssmi
I have written a demo using spark3.0.0, and the location where the
checkpoint file  is saved has been explicitly specified   like
>
> stream.option("checkpointLocation","file:///C:\\Users\\Administrator\\
> Desktop\\test")

But the app still throws an   exception about the HDFS file system.
Is it not possible to specify the local file system as a checkpoint
location now?


Re: [Structured Streaminig] multiple queries in one application

2020-05-03 Thread lec ssmi
For example, put the generated query into a list  and start every one, then
use the method awaitTermination() on the last one .

Abhisheks  于2020年5月1日周五 上午10:32写道:

> I hope you are using the Query object that is returned by the Structured
> streaming, right?
> Returned object contains a lot of information about each query and tracking
> state of the object should be helpful.
>
> Hope this may help, if not can you please share more details with examples?
>
> Best,
> A
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


[Structured Streaminig] multiple queries in one application

2020-04-29 Thread lec ssmi
Hi:
   I run a lot of queries in one spark structured streaming application. I
found that when one query fails, other queries can continue to run. But
there is no abnormal information.So the  queries are getting less and less,
and we can't find the reason. Is there any good solution for this situation?

Best
Lec Ssmi


Re: [Structured Streaming] NullPointerException in long running query

2020-04-28 Thread lec ssmi
It should be a problem of my data quality. It's curious why the driver-side
exception stack has no specific exception information.

Edgardo Szrajber  于2020年4月28日周二 下午3:32写道:

> The exception occured while aborting the stage. It might be interesting to
> try to understand the reason for the abortion.
> Maybe timeout? How long the query run?
> Bentzi
>
> Sent from Yahoo Mail on Android
> <https://go.onelink.me/107872968?pid=InProduct=Global_Internal_YGrowth_AndroidEmailSig__AndroidUsers_wl=ym_sub1=Internal_sub2=Global_YGrowth_sub3=EmailSignature>
>
> On Tue, Apr 28, 2020 at 9:25, Jungtaek Lim
>  wrote:
> The root cause of exception is occurred in executor side "Lost task 10.3
> in stage 1.0 (TID 81, spark6, executor 1)" so you may need to check there.
>
> On Tue, Apr 28, 2020 at 2:52 PM lec ssmi  wrote:
>
> Hi:
>   One of my long-running queries occasionally encountered the following
> exception:
>
>
>   Caused by: org.apache.spark.SparkException: Job aborted due to stage
> failure: Task 10 in stage 1.0 failed 4 times, most recent failure: Lost
> task 10.3 in stage 1.0 (TID 81, spark6, executor 1):
> java.lang.NullPointerException
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
> at scala.Option.foreach(Option.scala:257)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:929)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:927)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:927)
> at
> org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:49)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:475)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:473)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org
> $apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:472)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$

[Structured Streaming] NullPointerException in long running query

2020-04-27 Thread lec ssmi
Hi:
  One of my long-running queries occasionally encountered the following
exception:


  Caused by: org.apache.spark.SparkException: Job aborted due to stage
> failure: Task 10 in stage 1.0 failed 4 times, most recent failure: Lost
> task 10.3 in stage 1.0 (TID 81, spark6, executor 1):
> java.lang.NullPointerException
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
> at scala.Option.foreach(Option.scala:257)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:929)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:927)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:927)
> at
> org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:49)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:475)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:473)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org
> $apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:472)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
> at
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
> at org.apache.spark.sql.execution.streaming.StreamExecution.org
> $apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
> ... 1 more



According to the exception stack, it seems to have nothing to do with the
logic of my code.Is this a spark bug or something? The version of spark is
2.3.1.

Best
Lec Ssmi


Re: structured streaming Kafka consumer group.id override

2020-03-19 Thread lec ssmi
The last offset is stored  in file system you  specified , how does it
expire? I don't understand. I haven't  met that condition.

Srinivas V  于2020年3月19日周四 下午10:18写道:

> 1. How would a prod admin user/other engineers understand  which process
> is this random groupid which is consuming a specific  topic? why is it
> designed this way?
> 2. I don't see the groupid changing all the time. It is repeating on
> restarts. Not able to understand when and how it changes. I know it is
> trying to get the next offset from last consumed, but it is failing as the
> offset has been expired. What is the solution for this?
>
> On Thu, Mar 19, 2020 at 10:53 AM lec ssmi  wrote:
>
>> 1.Maybe  we can't use customized group  id in structured streaming.
>> 2.When restarting from failure or killing , the group id changes, but the
>> starting offset  will be the last one you consumed last time .
>>
>> Srinivas V  于2020年3月19日周四 下午12:36写道:
>>
>>> Hello,
>>> 1. My Kafka consumer name is randomly being generated by spark
>>> structured streaming. Can I override this?
>>> 2. When testing in development, when I stop my streaming job for Kafka
>>> consumer job for couple of days and try to start back again, the job keeps
>>> failing for missing offsets as the offsets get expired after 4 hours. I
>>> read that when restarting to consume messages SS always tries to get the
>>> earliest offset but not latest offset. How to handle this problem?
>>>
>>> Regards
>>> Srini
>>>
>>


Re: structured streaming Kafka consumer group.id override

2020-03-18 Thread lec ssmi
1.Maybe  we can't use customized group  id in structured streaming.
2.When restarting from failure or killing , the group id changes, but the
starting offset  will be the last one you consumed last time .

Srinivas V  于2020年3月19日周四 下午12:36写道:

> Hello,
> 1. My Kafka consumer name is randomly being generated by spark structured
> streaming. Can I override this?
> 2. When testing in development, when I stop my streaming job for Kafka
> consumer job for couple of days and try to start back again, the job keeps
> failing for missing offsets as the offsets get expired after 4 hours. I
> read that when restarting to consume messages SS always tries to get the
> earliest offset but not latest offset. How to handle this problem?
>
> Regards
> Srini
>


Re: Spark Streaming with mapGroupsWithState

2020-03-02 Thread lec ssmi
maybe you can combine the fields you want to use into one field

Something Something  于2020年3月3日周二 上午6:37写道:

> I am writing a Stateful Streaming application in which I am using
> mapGroupsWithState to create aggregates for Groups but I need to create 
> *Groups
> based on more than one column in the Input Row*. All the examples in the
> 'Spark: The Definitive Guide' use only one column such as 'User' or
> 'Device'. I am using code similar to what's given below. *How do I
> specify more than one field in the 'groupByKey'?*
>
> There are other challenges as well. The book says we can use
> 'updateAcrossEvents' the way given below but I get compile time error
> saying:
>
>
> *Error:(43, 65) missing argument list for method updateAcrossEvents in
> object MainUnapplied methods are only converted to functions when a
> function type is expected.You can make this conversion explicit by writing
> `updateAcrossEvents _` or `updateAcrossEvents(_,_,_,_,_)` instead of
> `updateAcrossEvents`.
> .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)*
>
> Another challenge: Compiler also complains about the my *MyReport*: 
> *Error:(41,
> 12) Unable to find encoder for type stored in a Dataset.  Primitive types
> (Int, String, etc) and Product types (case classes) are supported by
> importing spark.implicits._  Support for serializing other types will be
> added in future releases.*
>
> Help in resolving these errors would be greatly appreciated. Thanks in
> advance.
>
>
> withEventTime
> .as[MyReport]
>   .groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2?
>   
> .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
>   .writeStream
>   .queryName("test_query")
>   .format("memory")
>   .outputMode("update")
>   .start()
>
>


Re: dropDuplicates and watermark in structured streaming

2020-02-27 Thread lec ssmi
  Such as :
df.withWarmark("time","window
size").dropDulplicates("id").withWatermark("time","real
watermark").groupBy(window("time","window size","window
size")).agg(count("id"))
   can It  make count(distinct id) success?


lec ssmi  于2020年2月28日周五 下午1:11写道:

>   Such as :
> df.withWarmark("time","window
> size").dropDulplicates("id").withWatermark("time","real
> watermark").groupBy(window("time","window size","window
> size")).agg(count("id"))
>can It  make count(distinct count) success?
>
> Tathagata Das  于2020年2月28日周五 上午10:25写道:
>
>> 1. Yes. All times in event time, not processing time. So you may get 10AM
>> event time data at 11AM processing time, but it will still be compared
>> again all data within 9-10AM event times.
>>
>> 2. Show us your code.
>>
>> On Thu, Feb 27, 2020 at 2:30 AM lec ssmi  wrote:
>>
>>> Hi:
>>> I'm new to structured streaming. Because the built-in API cannot
>>> perform the Count Distinct operation of Window, I want to use
>>> dropDuplicates first, and then perform the window count.
>>>But in the process of using, there are two problems:
>>>1. Because it is streaming computing, in the process of
>>> deduplication, the state needs to be cleared in time, which requires the
>>> cooperation of watermark. Assuming my event time field is consistently
>>>   increasing, and I set the watermark to 1 hour, does it
>>> mean that the data at 10 o'clock will only be compared in these data from 9
>>> o'clock to 10 o'clock, and the data before 9 o'clock will be cleared ?
>>>2. Because it is window deduplication, I set the watermark
>>> before deduplication to the window size.But after deduplication, I need to
>>> call withWatermark () again to set the watermark to the real
>>>watermark. Will setting the watermark again take effect?
>>>
>>>  Thanks a lot !
>>>
>>


Re: dropDuplicates and watermark in structured streaming

2020-02-27 Thread lec ssmi
  Such as :
df.withWarmark("time","window
size").dropDulplicates("id").withWatermark("time","real
watermark").groupBy(window("time","window size","window
size")).agg(count("id"))
   can It  make count(distinct count) success?

Tathagata Das  于2020年2月28日周五 上午10:25写道:

> 1. Yes. All times in event time, not processing time. So you may get 10AM
> event time data at 11AM processing time, but it will still be compared
> again all data within 9-10AM event times.
>
> 2. Show us your code.
>
> On Thu, Feb 27, 2020 at 2:30 AM lec ssmi  wrote:
>
>> Hi:
>> I'm new to structured streaming. Because the built-in API cannot
>> perform the Count Distinct operation of Window, I want to use
>> dropDuplicates first, and then perform the window count.
>>But in the process of using, there are two problems:
>>1. Because it is streaming computing, in the process of
>> deduplication, the state needs to be cleared in time, which requires the
>> cooperation of watermark. Assuming my event time field is consistently
>>   increasing, and I set the watermark to 1 hour, does it mean
>> that the data at 10 o'clock will only be compared in these data from 9
>> o'clock to 10 o'clock, and the data before 9 o'clock will be cleared ?
>>2. Because it is window deduplication, I set the watermark
>> before deduplication to the window size.But after deduplication, I need to
>> call withWatermark () again to set the watermark to the real
>>watermark. Will setting the watermark again take effect?
>>
>>  Thanks a lot !
>>
>


dropDuplicates and watermark in structured streaming

2020-02-27 Thread lec ssmi
Hi:
I'm new to structured streaming. Because the built-in API cannot
perform the Count Distinct operation of Window, I want to use
dropDuplicates first, and then perform the window count.
   But in the process of using, there are two problems:
   1. Because it is streaming computing, in the process of
deduplication, the state needs to be cleared in time, which requires the
cooperation of watermark. Assuming my event time field is consistently
  increasing, and I set the watermark to 1 hour, does it mean
that the data at 10 o'clock will only be compared in these data from 9
o'clock to 10 o'clock, and the data before 9 o'clock will be cleared ?
   2. Because it is window deduplication, I set the watermark
before deduplication to the window size.But after deduplication, I need to
call withWatermark () again to set the watermark to the real
   watermark. Will setting the watermark again take effect?

 Thanks a lot !