Re: RecordTooLargeException in Spark *Structured* Streaming

2020-05-25 Thread Jungtaek Lim
Hi, You need to add the prefix "kafka." for the configurations which should be propagated to the Kafka. Others will be used in Spark data source itself. (Kafka connector in this case) https://spark.apache.org/docs/2.4.5/structured-streaming-kafka-integration.html#kafka-specific-configurations

Re: PySpark .collect() output to Scala Array[Row]

2020-05-25 Thread Wim Van Leuven
Looking at the stack trace, your data from Spark gets serialized to an ArrayList (of something) whereas in your scala code you are using an Array of Rows. So, the types don't lign up. That's the exception you are seeing: the JVM searches for a signature that simply does not exist. Try to turn the

Re: Using Spark Accumulators with Structured Streaming

2020-05-25 Thread Srinivas V
Hello, Even for me it comes as 0 when I print in OnQueryProgress. I use LongAccumulator as well. Yes, it prints on my local but not on cluster. But one consolation is that when I send metrics to Graphana, the values are coming there. On Tue, May 26, 2020 at 3:10 AM Something Something <

PySpark .collect() output to Scala Array[Row]

2020-05-25 Thread Nick Ruest
Hi, I've hit a wall with trying to implement a couple of Scala methods in a Python version of our project. I've implemented a number of these already, but I'm getting hung up with this one. My Python function looks like this: def Write_Graphml(data, graphml_path, sc): return

RecordTooLargeException in Spark *Structured* Streaming

2020-05-25 Thread Something Something
I keep getting this error message: *The message is 1169350 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.* As indicated in other posts, I am trying to set the “max.request.size” configuration in the Producer as

Re: Using Spark Accumulators with Structured Streaming

2020-05-25 Thread Something Something
No this is not working even if I use LongAccumulator. On Fri, May 15, 2020 at 9:54 PM ZHANG Wei wrote: > There is a restriction in AccumulatorV2 API [1], the OUT type should be > atomic or thread safe. I'm wondering if the implementation for > `java.util.Map[T, Long]` can meet it or not. Is

Re: Spark API and immutability

2020-05-25 Thread Holden Karau
So even on RDDs cache/persist mutate the RDD object. The important thing for Spark is that the data represented/in the RDD/Dataframe isn’t mutated. On Mon, May 25, 2020 at 10:56 AM Chris Thomas wrote: > > The cache() method on the DataFrame API caught me out. > > Having learnt that DataFrames

Fwd: Spark API and immutability

2020-05-25 Thread Chris Thomas
The cache() method on the DataFrame API caught me out. Having learnt that DataFrames are built on RDDs and that RDDs are immutable, when I saw the statement df.cache() in our codebase I thought ‘This must be a bug, the result is not assigned, the statement will have no affect.’ However, I’ve

Re: Arrow RecordBatches/Pandas Dataframes to (Arrow enabled) Spark Dataframe conversion in streaming fashion

2020-05-25 Thread Jorge Machado
Hey, from what I know you can try to Union them df.union(df2) Not sure if this is what you need > On 25. May 2020, at 13:53, Tanveer Ahmad - EWI wrote: > > Hi all, > > I need some help regarding Arrow RecordBatches/Pandas Dataframes to (Arrow > enabled) Spark Dataframe conversions. > Here

Arrow RecordBatches/Pandas Dataframes to (Arrow enabled) Spark Dataframe conversion in streaming fashion

2020-05-25 Thread Tanveer Ahmad - EWI
Hi all, I need some help regarding Arrow RecordBatches/Pandas Dataframes to (Arrow enabled) Spark Dataframe conversions. Here the example explains very well how to convert a single Pandas Dataframe to Spark Dataframe [1]. But in my case, some external applications are generating Arrow

Re: Parallelising JDBC reads in spark

2020-05-25 Thread Manjunath Shetty H
Thanks Dhaval for the suggestion, but in the case i mentioned in previous mail still data can be missed as the row number will change. - Manjunath From: Dhaval Patel Sent: Monday, May 25, 2020 3:01 PM To: Manjunath Shetty H Subject: Re: Parallelising JDBC

Re: Parallelising JDBC reads in spark

2020-05-25 Thread Manjunath Shetty H
Thanks Georg for the suggestion, but at this point changing the design is not really the option. Any other pointer would be helpful. Thanks Manjunath From: Georg Heiler Sent: Monday, May 25, 2020 11:52 AM To: Manjunath Shetty H Cc: Mike Artz ; user Subject:

Re: Parallelising JDBC reads in spark

2020-05-25 Thread Georg Heiler
Well you seem to have performance and consistency problems. Using a CDC tool fitting for your database you might be able to fix both. However, streaming the change events of the database log might be a bit more complicated. Tools like https://debezium.io/ could be useful - depending on your source

Re: Parallelising JDBC reads in spark

2020-05-25 Thread Manjunath Shetty H
Hi Georg, Thanks for the response, can please elaborate what do mean by change data capture ? Thanks Manjunath From: Georg Heiler Sent: Monday, May 25, 2020 11:14 AM To: Manjunath Shetty H Cc: Mike Artz ; user Subject: Re: Parallelising JDBC reads in spark