How to enable hive support on an existing Spark session?

2020-05-26 Thread Kun Huang (COSMOS)

Hi Spark experts,

I am seeking for an approach to enable hive support manually on an existing 
Spark session.

Currently, HiveContext seems the best way for my scenario. However, this class 
has already been marked as deprecated and it is recommended to use 
SparkSession.builder.enableHiveSupport(). This should be called before creating 
Spark session.

I wonder if there are other workaround?

Thanks,
Kun


Re: RecordTooLargeException in Spark *Structured* Streaming

2020-05-26 Thread Something Something
Thanks. Missed that part of documentation. Appreciate your help. Regards.

On Mon, May 25, 2020 at 10:42 PM Jungtaek Lim 
wrote:

> Hi,
>
> You need to add the prefix "kafka." for the configurations which should be
> propagated to the Kafka. Others will be used in Spark data source
> itself. (Kafka connector in this case)
>
>
> https://spark.apache.org/docs/2.4.5/structured-streaming-kafka-integration.html#kafka-specific-configurations
>
> Hope this helps.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
>
> On Tue, May 26, 2020 at 6:42 AM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> I keep getting this error message:
>>
>>
>> *The message is 1169350 bytes when serialized which is larger than the
>> maximum request size you have configured with the max.request.size
>> configuration.*
>>
>>
>>
>> As indicated in other posts, I am trying to set the “max.request.size”
>> configuration in the Producer as follows:
>>
>>
>> -
>>
>> .writeStream
>>
>> .format(*"kafka"*)
>>
>> .option(
>>
>>   *"kafka.bootstrap.servers"*,
>>
>>   conig.outputBootstrapServer
>>
>> )
>>
>> .option(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, *"1000"*)
>>
>> -
>>
>>
>>
>> But this is not working. Am I setting this correctly? Is there a
>> different way to set this property under Spark Structured Streaming?
>>
>>
>> Please help. Thanks.
>>
>>
>>


Re: Using Spark Accumulators with Structured Streaming

2020-05-26 Thread Something Something
Hmm... how would they go to Graphana if they are not getting computed in
your code? I am talking about the Application Specific Accumulators. The
other standard counters such as 'event.progress.inputRowsPerSecond' are
getting populated correctly!

On Mon, May 25, 2020 at 8:39 PM Srinivas V  wrote:

> Hello,
> Even for me it comes as 0 when I print in OnQueryProgress. I use
> LongAccumulator as well. Yes, it prints on my local but not on cluster.
> But one consolation is that when I send metrics to Graphana, the values
> are coming there.
>
> On Tue, May 26, 2020 at 3:10 AM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> No this is not working even if I use LongAccumulator.
>>
>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  wrote:
>>
>>> There is a restriction in AccumulatorV2 API [1], the OUT type should be
>>> atomic or thread safe. I'm wondering if the implementation for
>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace
>>> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3]
>>> and test if the StreamingListener and other codes are able to work?
>>>
>>> ---
>>> Cheers,
>>> -z
>>> [1]
>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>> [2]
>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>> [3]
>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>
>>> 
>>> From: Something Something 
>>> Sent: Saturday, May 16, 2020 0:38
>>> To: spark-user
>>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>>
>>> Can someone from Spark Development team tell me if this functionality is
>>> supported and tested? I've spent a lot of time on this but can't get it to
>>> work. Just to add more context, we've our own Accumulator class that
>>> extends from AccumulatorV2. In this class we keep track of one or more
>>> accumulators. Here's the definition:
>>>
>>>
>>> class CollectionLongAccumulator[T]
>>> extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>
>>> When the job begins we register an instance of this class:
>>>
>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>>
>>> Is this working under Structured Streaming?
>>>
>>> I will keep looking for alternate approaches but any help would be
>>> greatly appreciated. Thanks.
>>>
>>>
>>>
>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>> mailinglist...@gmail.com> wrote:
>>>
>>> In my structured streaming job I am updating Spark Accumulators in the
>>> updateAcrossEvents method but they are always 0 when I try to print them in
>>> my StreamingListener. Here's the code:
>>>
>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>> updateAcrossEvents
>>>   )
>>>
>>>
>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
>>> StreamingListener which writes values of the accumulators in
>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
>>> ZERO!
>>>
>>> When I added log statements in the updateAcrossEvents, I could see that
>>> these accumulators are getting incremented as expected.
>>>
>>> This only happens when I run in the 'Cluster' mode. In Local mode it
>>> works fine which implies that the Accumulators are not getting distributed
>>> correctly - or something like that!
>>>
>>> Note: I've seen quite a few answers on the Web that tell me to perform
>>> an "Action". That's not a solution here. This is a 'Stateful Structured
>>> Streaming' job. Yes, I am also 'registering' them in SparkContext.
>>>
>>>
>>>
>>>