Re: Corrupt record handling in spark structured streaming and from_json function

2018-12-26 Thread Colin Williams
https://stackoverflow.com/questions/53938967/writing-corrupt-data-from-kafka-json-datasource-in-spark-structured-streaming

On Wed, Dec 26, 2018 at 2:42 PM Colin Williams
 wrote:
>
> From my initial impression it looks like I'd need to create my own
> `from_json` using `jsonToStructs` as a reference but try to handle `
> case : BadRecordException => null ` or similar to try to write the non
> matching string to a corrupt records column
>
> On Wed, Dec 26, 2018 at 1:55 PM Colin Williams
>  wrote:
> >
> > Hi,
> >
> > I'm trying to figure out how I can write records that don't match a
> > json read schema via spark structred streaming to an output sink /
> > parquet location. Previously I did this in batch via corrupt column
> > features of batch. But in this spark structured streaming I'm reading
> > from kafka a string and using from_json on the value of that string.
> > If it doesn't match my schema then I from_json returns null for all
> > the rows, and does not populate a corrupt record column. But I want to
> > somehow obtain the source kafka string in a dataframe, and an write to
> > a output sink / parquet location.
> >
> > def getKafkaEventDataFrame(rawKafkaDataFrame: DataFrame, schema: 
> > StructType) = {
> >   val jsonDataFrame = rawKafkaDataFrame.select(col("value").cast("string"))
> >   jsonDataFrame.select(from_json(col("value"),
> > schema)).select("jsontostructs(value).*")
> > }

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Corrupt record handling in spark structured streaming and from_json function

2018-12-26 Thread Colin Williams
>From my initial impression it looks like I'd need to create my own
`from_json` using `jsonToStructs` as a reference but try to handle `
case : BadRecordException => null ` or similar to try to write the non
matching string to a corrupt records column

On Wed, Dec 26, 2018 at 1:55 PM Colin Williams
 wrote:
>
> Hi,
>
> I'm trying to figure out how I can write records that don't match a
> json read schema via spark structred streaming to an output sink /
> parquet location. Previously I did this in batch via corrupt column
> features of batch. But in this spark structured streaming I'm reading
> from kafka a string and using from_json on the value of that string.
> If it doesn't match my schema then I from_json returns null for all
> the rows, and does not populate a corrupt record column. But I want to
> somehow obtain the source kafka string in a dataframe, and an write to
> a output sink / parquet location.
>
> def getKafkaEventDataFrame(rawKafkaDataFrame: DataFrame, schema: StructType) 
> = {
>   val jsonDataFrame = rawKafkaDataFrame.select(col("value").cast("string"))
>   jsonDataFrame.select(from_json(col("value"),
> schema)).select("jsontostructs(value).*")
> }

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Corrupt record handling in spark structured streaming and from_json function

2018-12-26 Thread Colin Williams
Hi,

I'm trying to figure out how I can write records that don't match a
json read schema via spark structred streaming to an output sink /
parquet location. Previously I did this in batch via corrupt column
features of batch. But in this spark structured streaming I'm reading
from kafka a string and using from_json on the value of that string.
If it doesn't match my schema then I from_json returns null for all
the rows, and does not populate a corrupt record column. But I want to
somehow obtain the source kafka string in a dataframe, and an write to
a output sink / parquet location.

def getKafkaEventDataFrame(rawKafkaDataFrame: DataFrame, schema: StructType) = {
  val jsonDataFrame = rawKafkaDataFrame.select(col("value").cast("string"))
  jsonDataFrame.select(from_json(col("value"),
schema)).select("jsontostructs(value).*")
}

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to clean up logs-dirs and local-dirs of running spark streaming in yarn cluster mode

2018-12-26 Thread shyla deshpande
Hi Fawze,
Thank you for the link. But that is exactly what I am doing.
I think this is related to
yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage
setting.
When the disk utilization exceeds this setting, the node is marked
unhealthy.

Other than increasing the default 90%, is there anything else I can do?

Thanks
-Shyla



On Tue, Dec 25, 2018 at 7:26 PM Fawze Abujaber  wrote:

> http://shzhangji.com/blog/2015/05/31/spark-streaming-logging-configuration/
>
> On Wed, Dec 26, 2018 at 1:05 AM shyla deshpande 
> wrote:
>
>> Please point me to any documentation if available. Thanks
>>
>> On Tue, Dec 18, 2018 at 11:10 AM shyla deshpande <
>> deshpandesh...@gmail.com> wrote:
>>
>>> Is there a way to do this without stopping the streaming application in
>>> yarn cluster mode?
>>>
>>> On Mon, Dec 17, 2018 at 4:42 PM shyla deshpande <
>>> deshpandesh...@gmail.com> wrote:
>>>
 I get the ERROR
 1/1 local-dirs are bad: /mnt/yarn; 1/1 log-dirs are bad:
 /var/log/hadoop-yarn/containers

 Is there a way to clean up these directories while the spark streaming
 application is running?

 Thanks

>>>
>
> --
> Take Care
> Fawze Abujaber
>


Re: Packaging kafka certificates in uber jar

2018-12-26 Thread Colin Williams
Hi thanks. This is part of the solution I found after writing the
question. The other part being is that I needed to write the input
stream to a temporary file. I would prefer not to write any temporary
file but the  ssl.keystore.location properties seems to expect a file
path.

On Tue, Dec 25, 2018 at 5:26 AM Anastasios Zouzias  wrote:
>
> Hi Colin,
>
> You can place your certificates under src/main/resources and include them in 
> the uber JAR, see e.g. : 
> https://stackoverflow.com/questions/40252652/access-files-in-resources-directory-in-jar-from-apache-spark-streaming-context
>
> Best,
> Anastasios
>
> On Mon, Dec 24, 2018 at 10:29 PM Colin Williams 
>  wrote:
>>
>> I've been trying to read from kafka via a spark streaming client. I
>> found out spark cluster doesn't have certificates deployed. Then I
>> tried using the same local certificates I've been testing with by
>> packing them in an uber jar and getting a File handle from the
>> Classloader resource. But I'm getting a File Not Found exception.
>> These are jks certificates. Is anybody aware how to package
>> certificates in a jar with a kafka client preferably the spark one?
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>
>
> --
> -- Anastasios Zouzias

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org