Re: [Pyspark 2.4] Best way to define activity within different time window
Thank you both for your input! To calculate moving average of active users, could you comment on whether to go for RDD based implementation or dataframe? If dataframe, will window function work here? In general, how would spark behave when working with dataframe with date, week, month, quarter, year columns and groupie against each one one by one? On Sun, Jun 9, 2019 at 1:17 PM Jörn Franke wrote: > Depending on what accuracy is needed, hyperloglogs can be an interesting > alternative > https://en.m.wikipedia.org/wiki/HyperLogLog > > Am 09.06.2019 um 15:59 schrieb big data : > > From m opinion, Bitmap is the best solution for active users calculation. > Other solution almost bases on count(distinct) calculation process, which > is more slower. > > If you 've implemented Bitmap solution including how to build Bitmap, how > to load Bitmap, then Bitmap is the best choice. > 在 2019/6/5 下午6:49, Rishi Shah 写道: > > Hi All, > > Is there a best practice around calculating daily, weekly, monthly, > quarterly, yearly active users? > > One approach is to create a window of daily bitmap and aggregate it based > on period later. However I was wondering if anyone has a better approach to > tackling this problem.. > > -- > Regards, > > Rishi Shah > > -- Regards, Rishi Shah
RE: Spark on Kubernetes - log4j.properties not read
Hi Dave, As part of driver pod bringup, a configmap is created using all the spark configuration parameters (with name spark.properties) and mounted to /opt/spark/conf. So all the other files present in /opt/spark/conf will be overwritten. Same is happening with the log4j.properties in this case. You could try to build the container by placing the log4j.properties at some other location and set the same in spark.driver.extraJavaOptions Thanks and Regards, Abhishek From: Dave Jaffe Sent: Tuesday, June 11, 2019 6:45 AM To: user@spark.apache.org Subject: Spark on Kubernetes - log4j.properties not read I am using Spark on Kubernetes from Spark 2.4.3. I have created a log4j.properties file in my local spark/conf directory and modified it so that the console (or, in the case of Kubernetes, the log) only shows warnings and higher (log4j.rootCategory=WARN, console). I then added the command COPY conf /opt/spark/conf to /root/spark/kubernetes/dockerfiles/spark/Dockerfile and built a new container. However, when I run that under Kubernetes, the program runs successfully but /opt/spark/conf/log4j.properties is not used (I still see the INFO lines when I run kubectl logs ). I have tried other things such as explicitly adding a –properties-file to my spark-submit command and even --conf spark.driver.extraJavaOptions=-Dlog4j.configuration=file:///opt/spark/conf/log4j.properties My log4j.properties file is never seen. How do I customize log4j.properties with Kubernetes? Thanks, Dave Jaffe
Re: ARM CI for spark
Hi Tianhua, I read similar question to your's from HBase mailing list. so I'd like to let you know about efforts on supporting AArch64 from Apache Bigtop[1] I don't believe that CI and distribution of Bigtop is not exactly what you are looking for but, Folks from Linaro and Arm are contributing to Bigtop to support the arch. If you need further information, feel free to contact Bigtop community: https://bigtop.apache.org/mail-lists.html Regards, Younhwoo 1. https://sched.co/LJq5 On Tue, Jun 11, 2019 at 11:22 AM Tianhua huang wrote: > Hi all, > The CI testing for apache spark is supported by AMPLab Jenkins, and I find > there are some computers(most of them are Linux (amd64) arch) for the CI > development, but seems there is no Aarch64 computer for spark CI testing. > Recently, I build and run test for spark(master and branch-2.4) on my arm > server, and unfortunately there are some problems, for example, ut test is > failed due to a LEVELDBJNI native package, the details for java test see > http://paste.openstack.org/show/752063/ and also python test see > http://paste.openstack.org/show/752709/ > and I have send a mail for ask for help > http://mail-archives.apache.org/mod_mbox/spark-user/201906.mbox/%3ccakzhfyxi8wpyytgxnz5o9rhrpjve88g0chxww8x31ubkfr8...@mail.gmail.com%3E > So I have a question about the ARM CI testing for spark, is there any plan > to support it? Thank you very much and I will wait for your reply! >
ARM CI for spark
Hi all, The CI testing for apache spark is supported by AMPLab Jenkins, and I find there are some computers(most of them are Linux (amd64) arch) for the CI development, but seems there is no Aarch64 computer for spark CI testing. Recently, I build and run test for spark(master and branch-2.4) on my arm server, and unfortunately there are some problems, for example, ut test is failed due to a LEVELDBJNI native package, the details for java test see http://paste.openstack.org/show/752063/ and also python test see http://paste.openstack.org/show/752709/ and I have send a mail for ask for help http://mail-archives.apache.org/mod_mbox/spark-user/201906.mbox/%3ccakzhfyxi8wpyytgxnz5o9rhrpjve88g0chxww8x31ubkfr8...@mail.gmail.com%3E So I have a question about the ARM CI testing for spark, is there any plan to support it? Thank you very much and I will wait for your reply!
Spark on Kubernetes - log4j.properties not read
I am using Spark on Kubernetes from Spark 2.4.3. I have created a log4j.properties file in my local spark/conf directory and modified it so that the console (or, in the case of Kubernetes, the log) only shows warnings and higher (log4j.rootCategory=WARN, console). I then added the command COPY conf /opt/spark/conf to /root/spark/kubernetes/dockerfiles/spark/Dockerfile and built a new container. However, when I run that under Kubernetes, the program runs successfully but /opt/spark/conf/log4j.properties is not used (I still see the INFO lines when I run kubectl logs ). I have tried other things such as explicitly adding a –properties-file to my spark-submit command and even --conf spark.driver.extraJavaOptions=-Dlog4j.configuration=file:///opt/spark/conf/log4j.properties My log4j.properties file is never seen. How do I customize log4j.properties with Kubernetes? Thanks, Dave Jaffe
Re: Spark structured streaming leftOuter join not working as I expect
Hi all it took me some time to get the issues extracted into a piece of standalone code. I created the following gist https://gist.github.com/jammann/b58bfbe0f4374b89ecea63c1e32c8f17 I has messages for 4 topics A/B/C/D and a simple Python program which shows 6 use cases, with my expectations and observations with Spark 2.4.3 It would be great if you could have a look and check if I'm doing something wrong, or this is indeed a limitation of Spark? On 6/5/19 5:35 PM, Jungtaek Lim wrote: > Nice to hear you're investigating the issue deeply. > > Btw, if attaching code is not easy, maybe you could share logical/physical > plan on any batch: "detail" in SQL tab would show up the plan as string. > Plans from sequential batches would be much helpful - and streaming query > status in these batch (especially watermark) should be helpful too. > -- CU, Joe - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Kafka Topic to Parquet HDFS with Structured Streaming
Hello Deng, Thank you for your email. Issue was with Spark - Hadoop / HDFS configuration settings. Thanks On Mon, Jun 10, 2019 at 5:28 AM Deng Ching-Mallete wrote: > Hi Chetan, > > Best to check if the user account that you're using to run the job has > permission to write to the path in HDFS. I would suggest to write the > parquet files to a different path, perhaps to a project space or user home, > rather than at the root directory. > > HTH, > Deng > > On Sat, Jun 8, 2019 at 8:00 AM Chetan Khatri > wrote: > >> Hello Dear Spark Users, >> >> I am trying to write data from Kafka Topic to Parquet HDFS with >> Structured Streaming but Getting failures. Please do help. >> >> val spark: SparkSession = >> SparkSession.builder().appName("DemoSparkKafka").getOrCreate() >> import spark.implicits._ >> val dataFromTopicDF = spark >> .readStream >> .format("kafka") >> .option("kafka.bootstrap.servers", "localhost:9092") >> .option("subscribe", "test") >> .option("startingOffsets", "earliest") >> .load() >> .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") >> >> logger.info("DemoSparkKafka - Printing Topic Messages in Key - Value pairs.") >> val topicQuery = dataFromTopicDF.writeStream >> .format("console") >> .option("truncate", false) >> .option("checkpointLocation", "/tmp/checkpoint") >> .trigger(Trigger.ProcessingTime(10.seconds)) >> .start() >> >> topicQuery.awaitTermination() >> topicQuery.stop() >> >> >> Above code is working well but when I am trying to write to Parquet at HDFS >> getting exceptions. >> >> >> logger.info("DemoSparkKafka - Writing Topic Messages to Parquet at HDFS") >> >> val parquetQuery = dataFromTopicDF.writeStream >> .format("parquet") >> .option("startingOffsets", "earliest") >> .option("checkpointLocation", "/tmp/checkpoint") >> .option("path", "/sample-topic") >> .start() >> >> parquetQuery.awaitTermination() >> parquetQuery.stop() >> >> >> *Exception Details:* >> >> >> Exception in thread "main" java.io.IOException: mkdir of >> /sample-topic/_spark_metadata failed >> at org.apache.hadoop.fs.FileSystem.primitiveMkdir(FileSystem.java:1067) >> at >> org.apache.hadoop.fs.DelegateToFileSystem.mkdir(DelegateToFileSystem.java:176) >> at org.apache.hadoop.fs.FilterFs.mkdir(FilterFs.java:197) >> at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:730) >> at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:726) >> at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90) >> at org.apache.hadoop.fs.FileContext.mkdir(FileContext.java:733) >> at >> org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileContextManager.mkdirs(HDFSMetadataLog.scala:378) >> at >> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:66) >> at >> org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:46) >> at >> org.apache.spark.sql.execution.streaming.FileStreamSinkLog.(FileStreamSinkLog.scala:85) >> at >> org.apache.spark.sql.execution.streaming.FileStreamSink.(FileStreamSink.scala:98) >> at >> org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:317) >> at >> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:293) >> at >> com.dynasty.poc.DemoSparkKafka$.delayedEndpoint$com$dynasty$poc$DemoSparkKafka$1(DemoSparkKafka.scala:35) >> at >> com.dynasty.poc.DemoSparkKafka$delayedInit$body.apply(DemoSparkKafka.scala:7) >> at scala.Function0$class.apply$mcV$sp(Function0.scala:34) >> at >> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) >> at scala.App$$anonfun$main$1.apply(App.scala:76) >> at scala.App$$anonfun$main$1.apply(App.scala:76) >> at scala.collection.immutable.List.foreach(List.scala:381) >> at >> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) >> at scala.App$class.main(App.scala:76) >> at com.dynasty.poc.DemoSparkKafka$.main(DemoSparkKafka.scala:7) >> at com.dynasty.poc.DemoSparkKafka.main(DemoSparkKafka.scala) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) >> at >> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894) >> at >> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198) >> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228) >> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137) >> at
Re: Spark SQL
Spark can use the HiveMetastore as a catalog, but it doesn't use the hive parser or optimization engine. Instead it uses Catalyst, see https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html On Mon, Jun 10, 2019 at 2:07 PM naresh Goud wrote: > Hi Team, > > Is Spark Sql uses hive engine to run queries ? > My understanding that spark sql uses hive meta store to get metadata > information to run queries. > > Thank you, > Naresh > -- > Thanks, > Naresh > www.linkedin.com/in/naresh-dulam > http://hadoopandspark.blogspot.com/ > >
Spark SQL
Hi Team, Is Spark Sql uses hive engine to run queries ? My understanding that spark sql uses hive meta store to get metadata information to run queries. Thank you, Naresh -- Thanks, Naresh www.linkedin.com/in/naresh-dulam http://hadoopandspark.blogspot.com/
[Spark Core]: What is the release date for Spark 3 ?
Hi guys, I was not able to find the foreseen release date for Spark 3. Would one have any information on this please ? Many thanks, Alex
Fwd: Spark kafka streaming job stopped
We have spark kafka sreaming job running on standalone spark cluster. We have below kafka architecture 1. Two cluster running on two data centers. 2. There is LTM on top on each data center (load balance) 3. There is GSLB on top of LTM. I observed when ever any of the node in kafka cluster is down our spark stream job stopped. We are using GLSB url in our code to connect to Kafka not the IP address. Please let me know is it expected behavior if not then what config we need to change. Thanks Amit
Does anyone used spark-structured streaming successfully in production ?
https://stackoverflow.com/questions/56428367/any-clue-how-to-join-this-spark-structured-stream-joins
Re: Read hdfs files in spark streaming
Thanks All. I managed to get this working. Marking this thread as closed. On Mon, Jun 10, 2019 at 4:14 PM Deepak Sharma wrote: > This is the project requirement , where paths are being streamed in kafka > topic. > Seems it's not possible using spark structured streaming. > > > On Mon, Jun 10, 2019 at 3:59 PM Shyam P wrote: > >> Hi Deepak, >> Why are you getting paths from kafka topic? any specific reason to do so >> ? >> >> Regards, >> Shyam >> >> On Mon, Jun 10, 2019 at 10:44 AM Deepak Sharma >> wrote: >> >>> The context is different here. >>> The file path are coming as messages in kafka topic. >>> Spark streaming (structured) consumes form this topic. >>> Now it have to get the value from the message , thus the path to file. >>> read the json stored at the file location into another df. >>> >>> Thanks >>> Deepak >>> >>> On Sun, Jun 9, 2019 at 11:03 PM vaquar khan >>> wrote: >>> Hi Deepak, You can use textFileStream. https://spark.apache.org/docs/2.2.0/streaming-programming-guide.html Plz start using stackoverflow to ask question to other ppl so get benefits of answer Regards, Vaquar khan On Sun, Jun 9, 2019, 8:08 AM Deepak Sharma wrote: > I am using spark streaming application to read from kafka. > The value coming from kafka message is path to hdfs file. > I am using spark 2.x , spark.read.stream. > What is the best way to read this path in spark streaming and then > read the json stored at the hdfs path , may be using spark.read.json , > into > a df inside the spark streaming app. > Thanks a lot in advance > > -- > Thanks > Deepak > >>> >>> -- >>> Thanks >>> Deepak >>> www.bigdatabig.com >>> www.keosha.net >>> >> > > -- > Thanks > Deepak > www.bigdatabig.com > www.keosha.net > -- Thanks Deepak www.bigdatabig.com www.keosha.net
How spark structured streaming consumers initiated and invoked while reading multi-partitioned kafka topics?
Hi, Any suggestions regarding below issue? https://stackoverflow.com/questions/56524921/how-spark-structured-streaming-consumers-initiated-and-invoked-while-reading-mul Thanks, Shyam
Re: Read hdfs files in spark streaming
Hi Deepak, Why are you getting paths from kafka topic? any specific reason to do so ? Regards, Shyam On Mon, Jun 10, 2019 at 10:44 AM Deepak Sharma wrote: > The context is different here. > The file path are coming as messages in kafka topic. > Spark streaming (structured) consumes form this topic. > Now it have to get the value from the message , thus the path to file. > read the json stored at the file location into another df. > > Thanks > Deepak > > On Sun, Jun 9, 2019 at 11:03 PM vaquar khan wrote: > >> Hi Deepak, >> >> You can use textFileStream. >> >> https://spark.apache.org/docs/2.2.0/streaming-programming-guide.html >> >> Plz start using stackoverflow to ask question to other ppl so get >> benefits of answer >> >> >> Regards, >> Vaquar khan >> >> On Sun, Jun 9, 2019, 8:08 AM Deepak Sharma wrote: >> >>> I am using spark streaming application to read from kafka. >>> The value coming from kafka message is path to hdfs file. >>> I am using spark 2.x , spark.read.stream. >>> What is the best way to read this path in spark streaming and then read >>> the json stored at the hdfs path , may be using spark.read.json , into a df >>> inside the spark streaming app. >>> Thanks a lot in advance >>> >>> -- >>> Thanks >>> Deepak >>> >> > > -- > Thanks > Deepak > www.bigdatabig.com > www.keosha.net >
How to handle small file problem in spark structured streaming?
https://stackoverflow.com/questions/56524539/how-to-handle-small-file-problem-in-spark-structured-streaming Regards, Shyam
Re: Kafka Topic to Parquet HDFS with Structured Streaming
Hi Chetan, Best to check if the user account that you're using to run the job has permission to write to the path in HDFS. I would suggest to write the parquet files to a different path, perhaps to a project space or user home, rather than at the root directory. HTH, Deng On Sat, Jun 8, 2019 at 8:00 AM Chetan Khatri wrote: > Hello Dear Spark Users, > > I am trying to write data from Kafka Topic to Parquet HDFS with Structured > Streaming but Getting failures. Please do help. > > val spark: SparkSession = > SparkSession.builder().appName("DemoSparkKafka").getOrCreate() > import spark.implicits._ > val dataFromTopicDF = spark > .readStream > .format("kafka") > .option("kafka.bootstrap.servers", "localhost:9092") > .option("subscribe", "test") > .option("startingOffsets", "earliest") > .load() > .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") > > logger.info("DemoSparkKafka - Printing Topic Messages in Key - Value pairs.") > val topicQuery = dataFromTopicDF.writeStream > .format("console") > .option("truncate", false) > .option("checkpointLocation", "/tmp/checkpoint") > .trigger(Trigger.ProcessingTime(10.seconds)) > .start() > > topicQuery.awaitTermination() > topicQuery.stop() > > > Above code is working well but when I am trying to write to Parquet at HDFS > getting exceptions. > > > logger.info("DemoSparkKafka - Writing Topic Messages to Parquet at HDFS") > > val parquetQuery = dataFromTopicDF.writeStream > .format("parquet") > .option("startingOffsets", "earliest") > .option("checkpointLocation", "/tmp/checkpoint") > .option("path", "/sample-topic") > .start() > > parquetQuery.awaitTermination() > parquetQuery.stop() > > > *Exception Details:* > > > Exception in thread "main" java.io.IOException: mkdir of > /sample-topic/_spark_metadata failed > at org.apache.hadoop.fs.FileSystem.primitiveMkdir(FileSystem.java:1067) > at > org.apache.hadoop.fs.DelegateToFileSystem.mkdir(DelegateToFileSystem.java:176) > at org.apache.hadoop.fs.FilterFs.mkdir(FilterFs.java:197) > at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:730) > at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:726) > at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90) > at org.apache.hadoop.fs.FileContext.mkdir(FileContext.java:733) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileContextManager.mkdirs(HDFSMetadataLog.scala:378) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:66) > at > org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:46) > at > org.apache.spark.sql.execution.streaming.FileStreamSinkLog.(FileStreamSinkLog.scala:85) > at > org.apache.spark.sql.execution.streaming.FileStreamSink.(FileStreamSink.scala:98) > at > org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:317) > at > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:293) > at > com.dynasty.poc.DemoSparkKafka$.delayedEndpoint$com$dynasty$poc$DemoSparkKafka$1(DemoSparkKafka.scala:35) > at > com.dynasty.poc.DemoSparkKafka$delayedInit$body.apply(DemoSparkKafka.scala:7) > at scala.Function0$class.apply$mcV$sp(Function0.scala:34) > at > scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) > at scala.App$class.main(App.scala:76) > at com.dynasty.poc.DemoSparkKafka$.main(DemoSparkKafka.scala:7) > at com.dynasty.poc.DemoSparkKafka.main(DemoSparkKafka.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) > at > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894) > at > org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198) > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > > > Thanks > >