Re: Jdbc Hook in Spark Batch Application
Thanks. But there is a problem that the classes referenced in the code need to be modified. I want to try not to change the existing code. Gabor Somogyi 于2020年12月25日周五 上午12:16写道: > One can wrap the JDBC driver and such a way eveything can be sniffed. > > On Thu, 24 Dec 2020, 03:51 lec ssmi, wrote: > >> Hi: >>guys, I have some spark programs that have database connection >> operations. I want to acquire the connection information, such as jdbc >> connection properties , but not too intrusive to the code. >> Any good ideas ? Can java agent make it ? >> >> >
Jdbc Hook in Spark Batch Application
Hi: guys, I have some spark programs that have database connection operations. I want to acquire the connection information, such as jdbc connection properties , but not too intrusive to the code. Any good ideas ? Can java agent make it ?
Re: [Spark Structured Streaming] Not working while worker node is on different machine
Any more detail about it ? bannya 于2020年12月18日周五 上午11:25写道: > Hi, > > I have a spark structured streaming application that is reading data from a > Kafka topic (16 partitions). I am using standalone mode. I have two workers > node, one node is on the same machine with masters and another one is on a > different machine. Both of the worker nodes has 8 cores and 16G RAM with > one > executor. > > While I run the streaming application with one worker node which is on the > same machine as the master, the application is working fine. But while I am > running the application with two worker nodes, 8 tasks successfully > completed running on worker node 1 (which is on the same machine as > masters), but the other 8 tasks are scheduled on another worker node but > it's got stuck in the RUNNING stage and application got stuck. > > The normal spark application is running fine with this setup. > > Can anyone help me with this? > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: Printing Logs in map-partition
the logs printed in the map function exist in the worker node, you can access it directly, or you can browse through webui. abby37 于2020年12月23日周三 下午1:53写道: > I want to print some logs in transformation mapPartitions to logs the > internal working of function. > I have used following techniques without any success. > 1. System.out.println() > 2. System.err.println() > 3. Log4j - logger.info > 4. Log4j - logger.debug > > My code for mapPartitions is similar to this > < > https://github.com/broadinstitute/gatk/blob/master/src/main/java/org/broadinstitute/hellbender/tools/spark/bwa/BwaSparkEngine.java#L103> > > . > > Is there any way to print logs on console in mapPartitions. Thanks for your > time and helping me in advance. > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: mysql connector java issue
If you can not assembly the jdbc driver jar in your application jar package, you can put the jdbc driver jar in the spark classpath, generally, $SPARK_HOME/jars or $SPARK_HOME/lib. Artemis User 于2020年12月11日周五 上午5:21写道: > What happened was that you made the mysql jar file only available to the > spark driver, not the executors. Use the --jars parameter instead of > driver-class-path to specify your third-party jar files, or copy the > third-party jar files to the jars directory for Spark in your HDFS, and > specify the path of HDFS using --archives in spark-submit. > > -- ND > On 12/10/20 10:02 AM, ismail elhammoud wrote: > > Hello, > > Guys I have an issue with mysql connector java, even if I declared it in > sbt file It couldn't work if I don't give the whole path > > spark-submit --master yarn --driver-class-path > /home/node2/Téléchargements/mysql-connector-java-5.1.24-bin.jar > ./Sdatahub-assembly-0.1.jar > > > Regards, > Isma > >
Re: Using two WriteStreams in same spark structured streaming job
you can use *foreach* sink to achieve the logic you want. act_coder 于2020年11月4日周三 下午9:56写道: > I have a scenario where I would like to save the same streaming dataframe > to > two different streaming sinks. > > I have created a streaming dataframe which I need to send to both Kafka > topic and delta lake. > > I thought of using forEachBatch, but looks like it doesn't support multiple > STREAMING SINKS. > > Also, I tried using spark session.awaitAnyTermination() with multiple write > streams. But the second stream is not getting processed ! > > Is there a way through which we can achieve this ?! > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: MongoDB plugin to Spark - too many open cursors
Is the connection pool configured by mongodb full? Daniel Stojanov 于2020年10月26日周一 上午10:28写道: > Hi, > > > I receive an error message from the MongoDB server if there are too many > Spark applications trying to access the database at the same time (about > 3 or 4), "Cannot open a new cursor since too many cursors are already > opened." I am not too sure of how to remedy this. I am not sure how the > plugin behaves when it's pulling data. > > It appears that a given running application will open many connections > to the database. The total number of cursors in the database's setting > is many more than the number of read operations occurring in Spark. > > > Does the plugin keep a connection/cursor open to the database even after > it has pulled out the data into a dataframe? > > Why are there so many open cursors for a single read operation? > > Does catching the exception, sleeping for a while, then trying again > make sense? If cursors are kept open throughout the life of the > application this would not make sense. > > > Plugin version: org.mongodb.spark:mongo-spark-connector_2.12:2.4.1 > > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: Spark Structured streaming - Kakfa - slowness with query 0
Structured streaming's bottom layer also uses a micro-batch mechanism. It seems that the first batch is slower than the latter, I also often encounter this problem. It feels related to the division of batches. Other the other hand, spark's batch size is usually bigger than flume transaction bache size. KhajaAsmath Mohammed 于2020年10月21日周三 下午12:19写道: > Yes. Changing back to latest worked but I still see the slowness compared > to flume. > > Sent from my iPhone > > On Oct 20, 2020, at 10:21 PM, lec ssmi wrote: > > > Do you start your application with chasing the early Kafka data ? > > Lalwani, Jayesh 于2020年10月21日周三 上午2:19写道: > >> Are you getting any output? Streaming jobs typically run forever, and >> keep processing data as it comes in the input. If a streaming job is >> working well, it will typically generate output at a certain cadence >> >> >> >> *From: *KhajaAsmath Mohammed >> *Date: *Tuesday, October 20, 2020 at 1:23 PM >> *To: *"user @spark" >> *Subject: *[EXTERNAL] Spark Structured streaming - Kakfa - slowness with >> query 0 >> >> >> >> *CAUTION*: This email originated from outside of the organization. Do >> not click links or open attachments unless you can confirm the sender and >> know the content is safe. >> >> >> >> Hi, >> >> >> >> I have started using spark structured streaming for reading data from >> kaka and the job is very slow. Number of output rows keeps increasing in >> query 0 and the job is running forever. any suggestions for this please? >> >> >> >> >> >> >> >> Thanks, >> >> Asmath >> >
Re: Spark Structured streaming - Kakfa - slowness with query 0
Do you start your application with chasing the early Kafka data ? Lalwani, Jayesh 于2020年10月21日周三 上午2:19写道: > Are you getting any output? Streaming jobs typically run forever, and keep > processing data as it comes in the input. If a streaming job is working > well, it will typically generate output at a certain cadence > > > > *From: *KhajaAsmath Mohammed > *Date: *Tuesday, October 20, 2020 at 1:23 PM > *To: *"user @spark" > *Subject: *[EXTERNAL] Spark Structured streaming - Kakfa - slowness with > query 0 > > > > *CAUTION*: This email originated from outside of the organization. Do not > click links or open attachments unless you can confirm the sender and know > the content is safe. > > > > Hi, > > > > I have started using spark structured streaming for reading data from kaka > and the job is very slow. Number of output rows keeps increasing in query 0 > and the job is running forever. any suggestions for this please? > > > > > > Thanks, > > Asmath >
Re: how to disable replace HDFS checkpoint location in structured streaming in spark3.0.1
sorry, the mail title is a little problematic. "How to disable or replace .." lec ssmi 于2020年10月14日周三 上午9:27写道: > I have written a demo using spark3.0.0, and the location where the > checkpoint file is saved has been explicitly specified like >> >> stream.option("checkpointLocation","file:///C:\\Users\\Administrator >> \\Desktop\\test") > > But the app still throws an exception about the HDFS file system. > Is it not possible to specify the local file system as a checkpoint > location now? >
how to disable replace HDFS checkpoint location in structured streaming in spark3.0.1
I have written a demo using spark3.0.0, and the location where the checkpoint file is saved has been explicitly specified like > > stream.option("checkpointLocation","file:///C:\\Users\\Administrator\\ > Desktop\\test") But the app still throws an exception about the HDFS file system. Is it not possible to specify the local file system as a checkpoint location now?
Re: [Structured Streaminig] multiple queries in one application
For example, put the generated query into a list and start every one, then use the method awaitTermination() on the last one . Abhisheks 于2020年5月1日周五 上午10:32写道: > I hope you are using the Query object that is returned by the Structured > streaming, right? > Returned object contains a lot of information about each query and tracking > state of the object should be helpful. > > Hope this may help, if not can you please share more details with examples? > > Best, > A > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
[Structured Streaminig] multiple queries in one application
Hi: I run a lot of queries in one spark structured streaming application. I found that when one query fails, other queries can continue to run. But there is no abnormal information.So the queries are getting less and less, and we can't find the reason. Is there any good solution for this situation? Best Lec Ssmi
Re: [Structured Streaming] NullPointerException in long running query
It should be a problem of my data quality. It's curious why the driver-side exception stack has no specific exception information. Edgardo Szrajber 于2020年4月28日周二 下午3:32写道: > The exception occured while aborting the stage. It might be interesting to > try to understand the reason for the abortion. > Maybe timeout? How long the query run? > Bentzi > > Sent from Yahoo Mail on Android > <https://go.onelink.me/107872968?pid=InProduct&c=Global_Internal_YGrowth_AndroidEmailSig__AndroidUsers&af_wl=ym&af_sub1=Internal&af_sub2=Global_YGrowth&af_sub3=EmailSignature> > > On Tue, Apr 28, 2020 at 9:25, Jungtaek Lim > wrote: > The root cause of exception is occurred in executor side "Lost task 10.3 > in stage 1.0 (TID 81, spark6, executor 1)" so you may need to check there. > > On Tue, Apr 28, 2020 at 2:52 PM lec ssmi wrote: > > Hi: > One of my long-running queries occasionally encountered the following > exception: > > > Caused by: org.apache.spark.SparkException: Job aborted due to stage > failure: Task 10 in stage 1.0 failed 4 times, most recent failure: Lost > task 10.3 in stage 1.0 (TID 81, spark6, executor 1): > java.lang.NullPointerException > Driver stacktrace: > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) > at scala.Option.foreach(Option.scala:257) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099) > at > org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:929) > at > org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:927) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) > at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:927) > at > org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:49) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:475) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:473) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org > $apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:472) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivate
[Structured Streaming] NullPointerException in long running query
Hi: One of my long-running queries occasionally encountered the following exception: Caused by: org.apache.spark.SparkException: Job aborted due to stage > failure: Task 10 in stage 1.0 failed 4 times, most recent failure: Lost > task 10.3 in stage 1.0 (TID 81, spark6, executor 1): > java.lang.NullPointerException > Driver stacktrace: > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) > at scala.Option.foreach(Option.scala:257) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099) > at > org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:929) > at > org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:927) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) > at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:927) > at > org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:49) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:475) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:473) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org > $apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:472) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117) > at org.apache.spark.sql.execution.streaming.StreamExecution.org > $apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) > ... 1 more According to the exception stack, it seems to have nothing to do with the logic of my code.Is this a spark bug or something? The version of spark is 2.3.1. Best Lec Ssmi
Re: structured streaming Kafka consumer group.id override
The last offset is stored in file system you specified , how does it expire? I don't understand. I haven't met that condition. Srinivas V 于2020年3月19日周四 下午10:18写道: > 1. How would a prod admin user/other engineers understand which process > is this random groupid which is consuming a specific topic? why is it > designed this way? > 2. I don't see the groupid changing all the time. It is repeating on > restarts. Not able to understand when and how it changes. I know it is > trying to get the next offset from last consumed, but it is failing as the > offset has been expired. What is the solution for this? > > On Thu, Mar 19, 2020 at 10:53 AM lec ssmi wrote: > >> 1.Maybe we can't use customized group id in structured streaming. >> 2.When restarting from failure or killing , the group id changes, but the >> starting offset will be the last one you consumed last time . >> >> Srinivas V 于2020年3月19日周四 下午12:36写道: >> >>> Hello, >>> 1. My Kafka consumer name is randomly being generated by spark >>> structured streaming. Can I override this? >>> 2. When testing in development, when I stop my streaming job for Kafka >>> consumer job for couple of days and try to start back again, the job keeps >>> failing for missing offsets as the offsets get expired after 4 hours. I >>> read that when restarting to consume messages SS always tries to get the >>> earliest offset but not latest offset. How to handle this problem? >>> >>> Regards >>> Srini >>> >>
Re: structured streaming Kafka consumer group.id override
1.Maybe we can't use customized group id in structured streaming. 2.When restarting from failure or killing , the group id changes, but the starting offset will be the last one you consumed last time . Srinivas V 于2020年3月19日周四 下午12:36写道: > Hello, > 1. My Kafka consumer name is randomly being generated by spark structured > streaming. Can I override this? > 2. When testing in development, when I stop my streaming job for Kafka > consumer job for couple of days and try to start back again, the job keeps > failing for missing offsets as the offsets get expired after 4 hours. I > read that when restarting to consume messages SS always tries to get the > earliest offset but not latest offset. How to handle this problem? > > Regards > Srini >
Re: Spark Streaming with mapGroupsWithState
maybe you can combine the fields you want to use into one field Something Something 于2020年3月3日周二 上午6:37写道: > I am writing a Stateful Streaming application in which I am using > mapGroupsWithState to create aggregates for Groups but I need to create > *Groups > based on more than one column in the Input Row*. All the examples in the > 'Spark: The Definitive Guide' use only one column such as 'User' or > 'Device'. I am using code similar to what's given below. *How do I > specify more than one field in the 'groupByKey'?* > > There are other challenges as well. The book says we can use > 'updateAcrossEvents' the way given below but I get compile time error > saying: > > > *Error:(43, 65) missing argument list for method updateAcrossEvents in > object MainUnapplied methods are only converted to functions when a > function type is expected.You can make this conversion explicit by writing > `updateAcrossEvents _` or `updateAcrossEvents(_,_,_,_,_)` instead of > `updateAcrossEvents`. > .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)* > > Another challenge: Compiler also complains about the my *MyReport*: > *Error:(41, > 12) Unable to find encoder for type stored in a Dataset. Primitive types > (Int, String, etc) and Product types (case classes) are supported by > importing spark.implicits._ Support for serializing other types will be > added in future releases.* > > Help in resolving these errors would be greatly appreciated. Thanks in > advance. > > > withEventTime > .as[MyReport] > .groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2? > > .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents) > .writeStream > .queryName("test_query") > .format("memory") > .outputMode("update") > .start() > >
Re: dropDuplicates and watermark in structured streaming
Such as : df.withWarmark("time","window size").dropDulplicates("id").withWatermark("time","real watermark").groupBy(window("time","window size","window size")).agg(count("id")) can It make count(distinct id) success? lec ssmi 于2020年2月28日周五 下午1:11写道: > Such as : > df.withWarmark("time","window > size").dropDulplicates("id").withWatermark("time","real > watermark").groupBy(window("time","window size","window > size")).agg(count("id")) >can It make count(distinct count) success? > > Tathagata Das 于2020年2月28日周五 上午10:25写道: > >> 1. Yes. All times in event time, not processing time. So you may get 10AM >> event time data at 11AM processing time, but it will still be compared >> again all data within 9-10AM event times. >> >> 2. Show us your code. >> >> On Thu, Feb 27, 2020 at 2:30 AM lec ssmi wrote: >> >>> Hi: >>> I'm new to structured streaming. Because the built-in API cannot >>> perform the Count Distinct operation of Window, I want to use >>> dropDuplicates first, and then perform the window count. >>>But in the process of using, there are two problems: >>>1. Because it is streaming computing, in the process of >>> deduplication, the state needs to be cleared in time, which requires the >>> cooperation of watermark. Assuming my event time field is consistently >>> increasing, and I set the watermark to 1 hour, does it >>> mean that the data at 10 o'clock will only be compared in these data from 9 >>> o'clock to 10 o'clock, and the data before 9 o'clock will be cleared ? >>>2. Because it is window deduplication, I set the watermark >>> before deduplication to the window size.But after deduplication, I need to >>> call withWatermark () again to set the watermark to the real >>>watermark. Will setting the watermark again take effect? >>> >>> Thanks a lot ! >>> >>
Re: dropDuplicates and watermark in structured streaming
Such as : df.withWarmark("time","window size").dropDulplicates("id").withWatermark("time","real watermark").groupBy(window("time","window size","window size")).agg(count("id")) can It make count(distinct count) success? Tathagata Das 于2020年2月28日周五 上午10:25写道: > 1. Yes. All times in event time, not processing time. So you may get 10AM > event time data at 11AM processing time, but it will still be compared > again all data within 9-10AM event times. > > 2. Show us your code. > > On Thu, Feb 27, 2020 at 2:30 AM lec ssmi wrote: > >> Hi: >> I'm new to structured streaming. Because the built-in API cannot >> perform the Count Distinct operation of Window, I want to use >> dropDuplicates first, and then perform the window count. >>But in the process of using, there are two problems: >>1. Because it is streaming computing, in the process of >> deduplication, the state needs to be cleared in time, which requires the >> cooperation of watermark. Assuming my event time field is consistently >> increasing, and I set the watermark to 1 hour, does it mean >> that the data at 10 o'clock will only be compared in these data from 9 >> o'clock to 10 o'clock, and the data before 9 o'clock will be cleared ? >>2. Because it is window deduplication, I set the watermark >> before deduplication to the window size.But after deduplication, I need to >> call withWatermark () again to set the watermark to the real >>watermark. Will setting the watermark again take effect? >> >> Thanks a lot ! >> >
dropDuplicates and watermark in structured streaming
Hi: I'm new to structured streaming. Because the built-in API cannot perform the Count Distinct operation of Window, I want to use dropDuplicates first, and then perform the window count. But in the process of using, there are two problems: 1. Because it is streaming computing, in the process of deduplication, the state needs to be cleared in time, which requires the cooperation of watermark. Assuming my event time field is consistently increasing, and I set the watermark to 1 hour, does it mean that the data at 10 o'clock will only be compared in these data from 9 o'clock to 10 o'clock, and the data before 9 o'clock will be cleared ? 2. Because it is window deduplication, I set the watermark before deduplication to the window size.But after deduplication, I need to call withWatermark () again to set the watermark to the real watermark. Will setting the watermark again take effect? Thanks a lot !