hi,
Our Spark writes to GCS are slow. The reason I see is that a staging
directory used for the initial data generation following by copying the data
to actual directory in GCS. Following are few configs and code. Any
suggestions on how to speed this thing up will be great.
Hi,
We are using something like the following to write data to files in
Structured Streaming and we seem to get file names as part* as mentioned in
https://stackoverflow.com/questions/51056764/how-to-define-a-spark-structured-streaming-file-sink-file-path-or-file-name.
How to get file names
Hi,
Is there a way that Automatic Json Schema inference can be done using
Structured Streaming? I do not want to supply a predefined schema and bind
it.
With Spark Kafka Direct I could do spark.read.json(). I see that this is not
supported in Structured Streaming.
Thanks!
--
Sent from:
hi,
How do we get information like lag and queued up batches in Structured
streaming? Following api does not seem to give any info about lag and
queued up batches similar to DStreams.
https://spark.apache.org/docs/2.2.1/api/java/org/apache/spark/streaming/scheduler/BatchInfo.html
Thanks!
Hi,
How do I implement forEachWriter in structured streaming so that the connect
is created once per partition and updates are done in a batch just like
forEachPartition in RDDs?
Thanks for the help!
--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
hi,
We have code based on Spark Kafka Direct in production and we want to port
this code to Structured Streaming. Does structured streaming support spark
kafka direct? What are the configs for parallelism and scalability in
structured streaming? In Spark Kafka Direct, the number of kafka
Hi,
How to get the time slice or the batch time for which the current micro
batch is running in Spark Streaming? Currently I am using System time which
is causing the clearing keys feature of reduceByKeyAndWindow to not work
properly.
Thanks,
Swetha
--
Sent from:
Hi,
What would be the appropriate settings to run Spark with Kafka 10? My job
works fine with Spark with Kafka 8 and with Kafka 8 cluster. But its very
slow with Kafka 10 by using Kafka Direct' experimental APIs for Kafka 10 . I
see the following error sometimes . Please see the kafka parameters
Hi,
ReduceByKeyAndWindow checkpoint recovery has issues when trying to recover
for the second time. Basically it is losing the reduced value of the
previous window but is present in the old values that needs to be inverse
reduced resulting in the following error. Does anyone has any idea as to
Hi,
How to force Spark Kafka Direct to start from the latest offset when the lag
is huge in kafka 10? It seems to be processing from the latest offset stored
for a group id. One way to do this is to change the group id. But it would
mean that each time that we need to process the job from the
Hi,
I am facing issues while trying to recover a textFileStream from checkpoint.
Basically it is trying to load the files from the begining of the job start
whereas I am deleting the files after processing them. I have the following
configs set so was thinking that it should not look for files
Hi,
Memory consumption and checkpointed data seems to increase incrementally
when reduceByKeyAndWindow with inverse function is used with mapWithState.
My application uses stateful streaming with mapWithState. The keys generated
by mapWithState are then used by reduceByKeyAndWindow to do
Hi,
Do we need to specify checkpointing for mapWithState just like we do for
updateStateByKey?
Thanks,
Swetha
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Does-mapWithState-need-checkpointing-to-be-specified-in-Spark-Streaming-tp28858.html
Sent from
Hi,
Spark streaming does not seem to clear MapPartitionsRDD and ShuffledRDD that
are persisted after the use of updateStateByKey and reduceByKeyAndWindow
with inverse functions even after checkpointing the data. Any idea as to why
thing happens? Is there a way that I can set a time out to clear
Hi,
We use updateStateByKey in our Spark streaming application. The partitions
cached by updateStateByKey does not seem to be getting evicted. It was
getting evicted fine with spark.cleaner.ttl in 1.5.1. I am facing issues
with partitions not getting evicted with Stateful Streaming after Spark
Hi,
We use UpdateStateByKey, reduceByKeyWindow and checkpoint the data. We
store the offsets in Zookeeper. How to make sure that the state of the job
is maintained upon redeploying the code?
Thanks!
--
View this message in context:
Hi,
I have checkpoints enabled in Spark streaming and I use updateStateByKey and
reduceByKeyAndWindow with inverse functions. How do I reduce the amount of
data that I am writing to the checkpoint or clear out the data that I dont
care?
Thanks!
--
View this message in context:
Hi,
How do I find the time taken by each step in a stage in spark job? Also, how
do I find the bottleneck in each step and if a stage is skipped because of
the RDDs being persisted in streaming?
I am trying to identify which step in a job is taking time in my Streaming
job.
Thanks!
--
View
Hi,
We have reduceByKeyAndWindow with inverse function feature in our Streaming
job to calculate rolling counts for the past hour and for the past 24 hours.
It seems that the functionality is iterating over all the keys in the window
even though they are not present in the current batch causing
Hi,
How do we bootstrap the streaming job with the previous state when we do a
code change and redeploy? We use updateStateByKey to maintain the state and
store session objects and LinkedHashMaps in the checkpoint.
Thanks,
Swetha
--
View this message in context:
Hi,
Is structured streaming ready for production usage in Spark 2.2?
Thanks,
Swetha
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Is-Structured-streaming-ready-for-production-usage-tp28751.html
Sent from the Apache Spark User List mailing list archive
Hi,
I see the following error when I use ReduceByKeyAndWindow in my Spark
Streaming app. I use reduce, invReduce and filterFunction as shown below.
Any idea as to why I get the error?
java.lang.Exception: Neither previous window has value for key, nor new
values found. Are you sure your key
Hi,
What happens if I dont specify checkpointing on a DStream that has
reduceByKeyAndWindow with no inverse function? Would it cause the memory to
be overflown? My window sizes are 1 hour and 24 hours.
I cannot provide an inserse function for this as it is based on HyperLogLog.
My code looks
Hi,
We have a requirement to calculate unique visitors in Spark Streaming. Can
HyperLogLogMonoid be applied to a sliding window in Spark Streaming to
calculate unique visitors? Any example on how to do that would be of great
help.
Thanks,
Swetha
--
View this message in context:
Hi,
How does Spark provide Hive style bucketing support in Spark 2.x?
Thanks,
Swetha
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/How-does-Spark-provide-Hive-style-bucketing-support-tp28462.html
Sent from the Apache Spark User List mailing list archive
Hi,
How to tune the Spark Jobs that use groupBy operations? Earlier I used to
use --conf spark.shuffle.memoryFraction=0.8 --conf
spark.storage.memoryFraction=0.1 to tune my jobs that use groupBy. But,
with Spark 2.x this configs seem to have been deprecated.
What would be the appropriate
Hi,
How to configure global_temp database via SparkConf? I know that its a
System Preserved database. Can it be preserved via Spark Conf?
Thanks,
Swetha
--
View this message in context:
Hi,
The global_temp database is not getting created when I try to use Spark 2.x.
Do I need to create it manually or do I need any permissions to do the same?
17/02/28 12:08:09 INFO HiveMetaStore.audit: ugi=user12345
ip=unknown-ip-addr
cmd=get_database: global_temp
17/02/28 12:08:09
Hi,
I have been trying to get my Spark job upgraded to 2.x. I see the following
error. It seems to be looking for some global_temp database by default. Is
this a behaviour of Spark 2.x that it looks for global_temp database by
default?
17/02/27 16:59:09 INFO HiveMetaStore.audit: ugi=user1234
Hi,
How to set the hive configurations in Spark 2.1? I have the following in
1.6. How to set the configs related to hive using the new SparkSession?
sqlContext.sql(s"use ${HIVE_DB_NAME} ")
sqlContext.setConf("hive.exec.dynamic.partition", "true")
Hi,
I created an external table in Spark sql using hiveContext ...something like
CREATE EXTERNAL TABLE IF NOT EXISTS sampleTable stored as ORC LOCATION ...
I can see the files getting created under the location I specified and able
to query it as well... but, I don't see the table in Hive when I
Hi,
I need to do integration tests using Spark Streaming. My idea is to spin up
kafka using docker locally and use it to feed the stream to my Streaming
Job. Any suggestions on how to do this would be of great help.
Thanks,
Swetha
--
View this message in context:
Hi,
I need to write some integration tests for my Spark Streaming app. Any
example on how to do this would be of great help.
Thanks,
Swetha
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Integration-tests-for-Spark-Streaming-tp27246.html
Sent from the
Hi,
I keep getting the following error in my Spark Streaming every now and then
after the job runs for say around 10 hours. I have those 2 classes
registered in kryo as shown below. sampleMap is a field in SampleSession
as shown below. Any suggestion as to how to avoid this would be of great
Hi,
How to insert data into 2000 partitions(directories) of ORC/parquet at a
time using Spark SQL? It seems to be not performant when I try to insert
2000 directories of Parquet/ORC using Spark SQL. Did anyone face this issue?
Thanks!
--
View this message in context:
Hi,
In my Spark SQL query to insert data, I have around 14,000 partitions of
data which seems to be causing memory issues. How can I insert the data for
100 partitions at a time to avoid any memory issues?
--
View this message in context:
Hi,
How to set the degree of parallelism in Spark SQL? I am using the following
but it somehow seems to allocate only two executors at a time.
sqlContext.sql(" set spark.sql.shuffle.partitions 200 ")
Thanks,
Swetha
--
View this message in context:
Hi,
What factors decide the number of executors when doing a Spark SQL insert?
Right now when I submit my job in Mesos I see only 2 executors getting
allocated all the time.
Thanks!
--
View this message in context:
Hi,
I see some memory issues when trying to insert the data in the form of ORC
using Spark SQL. Please find the query and exception below. Any idea as to
why this is happening?
sqlContext.sql(" CREATE EXTERNAL TABLE IF NOT EXISTS records (id STRING,
record STRING) PARTITIONED BY (datePartition
Hi,
We seem to be getting a lot of LeaderLostExceptions and our source Stream is
working with a default value of rebalance.backoff.ms which is 2000. I was
thinking to increase this value to 5000. Any suggestions on this?
Thanks!
--
View this message in context:
Hi,
How do I add an accumulator for a Set in Spark?
Thanks!
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-add-an-accumulator-for-a-Set-in-Spark-tp26510.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
Hi,
What is a good unit testing framework for Spark batch/streaming jobs? I have
core spark, spark sql with dataframes and streaming api getting used. Any
good framework to cover unit tests for these APIs?
Thanks!
--
View this message in context:
Hi,
How can I control the number of parquet files getting created under a
partition? I have my sqlContext queries to create a table and insert the
records as follows. It seems to create around 250 parquet files under each
partition though I was expecting that to create around 2 or 3 files. Due to
Hi,
I seem to be getting the following error when I try to insert data to a
parquet datasource. Any idea as to why this is happening?
org.apache.hadoop.hive.ql.metadata.HiveException:
parquet.hadoop.MemoryManager$1: New Memory allocation 1045004 bytes is
smaller than the minimum allocation size
Hi,
I am getting an error when trying to overwrite a partition dynamically.
Following is the code and the error. Any idea as to why this is happening?
test.write.partitionBy("dtPtn","idPtn").mode(SaveMode.Overwrite).format("parquet").save("/user/test/sessRecs")
16/02/28 18:02:55 ERROR
Hi,
I need to overwrite data dynamically to specific partitions depending on
filters. How can that be done in sqlContext?
Thanks,
Swetha
--
View this message in context:
Hi,
How do I join multiple tables and use subqueries in Spark SQL using
sqlContext? Can I do this using sqlContext or do I have to use HiveContext
for the same?
Thanks!
--
View this message in context:
Hi,
I am saving my records in the form of parquet files using dataframes in
hdfs. How to delete the records using dataframes?
Thanks!
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-delete-a-record-from-parquet-files-using-dataframes-tp26242.html
Hi,
How do I use a custom partitioner when I do a saveAsTable in a dataframe.
Thanks,
Swetha
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-a-custom-partitioner-in-a-dataframe-in-Spark-tp26240.html
Sent from the Apache Spark User List
Hi,
I get an error when I do a SaveAsTable as shown below. I do have write
access to the hive volume. Any idea as to why this is happening?
val df = testDF.toDF("id", "rec")
df.printSchema()
val options = Map("path" -> "/hive/test.db/")
Hi,
How to partition a dataframe of User Objects based on an Id so that I can
both do a join on an Id and also retrieve all the user objects in between a
time period when queried?
Thanks!
--
View this message in context:
Hi,
Is predicate push down supported by default in dataframes or is it dependent
on the format in which the dataframes is stored like Parquet?
Thanks,
Swetha
--
View this message in context:
Hi,
How to join an RDD with a hive table and retrieve only the records that I am
interested. Suppose, I have an RDD that has 1000 records and there is a Hive
table with 100,000 records, I should be able to join the RDD with the hive
table by an Id and I should be able to load only those 1000
Hi,
Is it possible to query a hive table which has data stored in the form of a
parquet file from inside map/partitions in Spark? My requirement is that I
have a User table in Hive/hdfs and for each record inside a sessions RDD, I
should be able to query the User table and if the User table
Hi,
We have a requirement wherein we need to store the documents in hdfs. The
documents are nothing but Json Strings. We should be able to query them by
Id using Spark SQL/Hive Context as and when needed. What would be the
correct approach to do this?
Thanks!
--
View this message in context:
Hi,
How can I query a hive table from inside mappartitions to retrieve a value
by Id?
Thanks,
Swetha
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-query-a-Hive-table-by-Id-from-inside-map-partitions-tp26220.html
Sent from the Apache Spark User
Hi,
How to do a lookup by id from a set of records stored in hdfs from inside a
transformation/action of an RDD.
Thanks,
Swetha
--
View this message in context:
Hi ,
How to get a fixed amount of records from an RDD in Driver? Suppose I want
the records from 100 to 1000 and then save them to some external database, I
know that I can do it from Workers in partition but I want to avoid that for
some reasons. The idea is to collect the data to driver and
hi,
How to use a registerTempTable to register an RDD as a temporary table and
use it inside mapPartitions of a different RDD?
Thanks,
Swetha
--
View this message in context:
Hi,
How do I edit/delete a message posted in Apache Spark User List?
Thanks!
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-edit-delete-a-message-posted-in-Apache-Spark-User-List-tp26160.html
Sent from the Apache Spark User List mailing list
Hi,
I have the Streaming job running in qa/prod. Due to Kafka issues both the
jobs went down. After the Kafka issues got resolved and after the deletion
of the checkpoint directory the driver in the qa job restarted the job
automatically and the application UI was up. But, in the prod job, the
Hi,
I see the following error in Spark Streaming with Kafka Direct. I think that
this error is related to Kafka topic. Any suggestions on how to avoid this
error would be of great help.
java.nio.channels.ClosedChannelException
at
Hi,
Can a tempTable registered in sqlContext be used to query inside forEachRDD
as shown below?
My requirement is that I have a set of data in the form of parquet inside
hdfs and I need to register the data
as a tempTable using sqlContext and query it inside forEachRDD as shown
below.
Hi,
How to load partial data from hdfs using Spark SQL? Suppose I want to load
data based on a filter like
"Select * from table where id = " using Spark SQL with DataFrames,
how can that be done? The
idea here is that I do not want to load the whole data into memory when I
use the SQL and I
Hi,
Can SQL Context be used inside mapPartitions? My requirement is to register
a set of data from hdfs as a temp table and to be able to lookup from inside
MapPartitions based on a key. If it is not supported, is there a different
way of doing this?
Thanks!
--
View this message in context:
Hi,
My Spark Batch job seems to hung up sometimes for a long time before it
starts the next stage/exits. Basically it happens when it has
mapPartition/foreachPartition in a stage. Any idea as to why this is
happening?
Thanks,
Swetha
--
View this message in context:
Hi,
How to run multiple Spark jobs that takes Spark Streaming data as the
input as a workflow in Oozie? We have to run our Streaming job first and
then have a workflow of Spark Batch jobs to process the data.
Any suggestions on this would be of great help.
Thanks!
--
View this message in
Hi,
What are the comparisons between Ganglia and Graphite to monitor the
Streaming Cluster? Which one has more advantages over the other?
Thanks!
--
View this message in context:
Hi,
Should the gmond be installed in all the Spark nodes? What should the host
and port be? Should it be the host and port of gmetad?
Enable GangliaSink for all instances
*.sink.ganglia.class=org.apache.spark.metrics.sink.GangliaSink
*.sink.ganglia.name=hadoop_cluster1
Hi,
Were you able to setup custom metrics in GangliaSink? If so, how did you
register the custom metrics?
Thanks!
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Executor-metrics-in-spark-application-tp188p25647.html
Sent from the Apache Spark User List
Hi,
Where does *.sink.csv.directory directory get created? I cannot see nay
metrics in logs. How did you verify consoleSink and csvSink?
Thanks!
--
View this message in context:
Hi,
How do I configure custom metrics using Ganglia Sink?
Thanks!
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-custom-metrics-using-Ganglia-Sink-tp25645.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
Hi,
I cannot see any metrics as well. How did you verify ConsoleSink and
CSVSink works OK? Where does *.sink.csv.directory get created?
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Can-not-see-any-spark-metrics-on-ganglia-web-tp14981p25644.html
Sent
Hi,
How to do a maven build to enable monitoring using Ganglia? What is the
command for the same?
Thanks,
Swetha
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-build-Spark-with-Ganglia-to-enable-monitoring-using-Ganglia-tp25625.html
Sent from the
Hi,
In my Streaming Job, most of the time seems to be taken by one executor. The
shuffle read records is 713758 in that one particular executor but 0 in
others. I have a groupBy followed by updateStateByKey, flatMap, map,
reduceByKey and updateStateByKey operations in that Stage. I am suspecting
Hi,
I seem to be getting class cast exception in Kryo Serialization. Following
is the error. Child1 class is a map in parent class. Child1 has a hashSet
testObjects of the type Object1. I get an error when it tries to
deserialize Object1. Any idea as to why this is happening?
Hi,
How to identify total schedule delay in a Streaming app using Ganglia? Any
sample code to integrate Ganglia with a Streaming app to generate Time
Delay, number of batches failed in the last 5 minutes would be of great
help.
Thanks,
Swetha
--
View this message in context:
Hi,
Our processing times in Spark Streaming with kafka Direct approach seems to
have increased considerably with increase in the Site traffic. Would
increasing the number of kafka partitions decrease the processing times?
Any suggestions on tuning to reduce the processing times would be of
Hi,
What is the way to modularize Spark Streaming jobs something along the lines
of what Spring XD does?
Thanks,
Swetha
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-modularize-Spark-Streaming-Jobs-tp25569.html
Sent from the Apache Spark User
Hi,
We need to monitor and identify if the Streaming job has been failing for
the last 5 minutes and restart the job accordingly. In most cases our Spark
Streaming with Kafka direct fails with leader lost errors. Or offsets not
found errors for that partition. What is the most effective way to
Hi,
So, our Streaming Job fails with the following errors. If you see the errors
below, they are all related to Kafka losing offsets and
OffsetOutOfRangeException.
What are the options we have other than fixing Kafka? We would like to do
something like the following. How can we achieve 1 and 2
Hi,
I have the following code that saves the parquet files in my hourly batch to
hdfs. My idea is to coalesce the files to 1500 smaller files. The first run
it gives me 1500 files in hdfs. For the next runs the files seem to be
increasing even though I coalesce.
Its not getting coalesced to
Hi,
I am submitting my Spark job with supervise option as shown below. When I
kill the driver and the app from UI, the driver does not restart
automatically. This is in a cluster mode. Any suggestion on how to make
Automatic Driver Restart work would be of great help.
--supervise
Thanks,
Hi,
Does receiver based approach lose any data in case of a leader/broker loss
in Spark Streaming? We currently use Kafka Direct for Spark Streaming and it
seems to be failing out when there is a leader loss and we can't really
guarantee that there won't be any leader loss due rebalancing.
If
84 matches
Mail list logo