Re: Kafka Avro Schema Registry Support

2018-09-28 Thread Raghu Angadi
Looks like your producer writing a Avro specfic records.

Can you read the records using bundled console consumer? I think it will be
simpler for you to get it returning valid records and use the same
deserializer config with your KafkaIO reader.

On Fri, Sep 28, 2018 at 9:33 AM Vishwas Bm  wrote:

> Hi Raghu,
>
> Thanks for the response.  We are now trying with GenericAvroDeserializer
> but still seeing issues.
> We have a producer which sends messages to kafka in format
> .
>
> Below is the code snippet, we have used at Beam KafkaIo.
>
>  org.apache.avro.Schema schema = null;
> try {
> schema = new org.apache.avro.Schema.Parser().parse(new
> File("Schema path"));
> } catch (Exception e) {
> e.printStackTrace();
> }
> KafkaIO.Read kafkaIoRead = KafkaIO. GenericRecord>read()
>
> .withBootstrapServers(bootstrapServerUrl).withTopic(topicName)
> .withKeyDeserializer(StringDeserializer.class)
>
> .withValueDeserializerAndCoder(GenericAvroDeserializer.class,
> AvroCoder.of(schema))
>
> .updateConsumerProperties(ImmutableMap.of("schema.registry.url", schemaUrl))
> .withTimestampPolicyFactory((tp, prevWatermark) -> new
> KafkaCustomTimestampPolicy(maxDelay,
> timestampInfo, prevWatermark));
>
> Below is the error seen,
>
> Caused by:
> avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException:
> org.apache.avro.AvroRuntimeException: Not a Specific class: interface
> org.apache.avro.generic.GenericRecord
> at
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2234)
> at
> avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965)
> at
> avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969)
> at
> avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829)
> at
> org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225)
> ... 8 more
> Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class:
> interface org.apache.avro.generic.GenericRecord
> at
> org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285)
> at
> org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
> at
> org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at
> org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
>
>
> Can you provide some pointers on this.
>
>
> *Thanks & Regards,*
>
> *Vishwas *
>
>
>
> On Fri, Sep 28, 2018 at 3:12 AM Raghu Angadi  wrote:
>
>> It is a compilation error due to type mismatch for value type.
>>
>> Please match key and value types for KafkaIO reader. I.e. if you have
>> KafkaIO.read().,  'withValueDeserializer()' needs a
>> class object which extends 'Deserializer'. Since
>> KafkaAvroDeserializer extends 'Deserializer', so your ValueType
>> needs to be Object, instead of String.
>>
>> Btw, it might be better to use GenericAvroDeseiralizer or
>> SpecificAvroDeserializer from the same package.
>>
>>
>> On Thu, Sep 27, 2018 at 10:31 AM Vishwas Bm  wrote:
>>
>>>
>>> Hi Raghu,
>>>
>>> The deserializer is provided by confluent
>>> *io.confluent.kafka.serializers* package.
>>>
>>> When we set valueDeserializer as  KafkaAvroDeserializer.  We are getting
>>> below error:
>>>The method withValueDeserializer(Class>> Deserializer>) in the type KafkaIO.Read is not
>>> applicable for the arguments
>>>  (Class)
>>>
>>> From the error, it looks like beam does not support this deserializer.
>>> Also we wanted to use schemaRegistry from confluent, is this supported
>>> in Beam ?
>>>
>>>
>>> *Thanks & Regards,*
>>> *Vishwas *
>>>
>>>
>>> On Thu, Sep 27, 2018 at 10:28 PM Raghu Angadi 
>>> wrote:
>>>
 You can set key/value deserializers :
 https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L101
 What are the errors you see?

 Also note that Beam includes AvroCoder for handling Avro records in
 Beam.

 On Thu, Sep 27, 2018 at 6:05 AM rahul patwari <
 rahulpatwari8...@gmail.com> wrote:

> Hi,
>
> We have a usecase to read data from Kafka serialized with
> KafkaAvroSerializer and schema is present in Schema Registry.
>
> When we are trying to use ValueDeserializer as
> io.confluent.kafka.serializers.KafkaAvroDeserializer to get GenericRecord,
> we are seeing errors.
>
> Does 

Re: Kafka Avro Schema Registry Support

2018-09-28 Thread Vishwas Bm
Hi Raghu,

Thanks for the response.  We are now trying with GenericAvroDeserializer
but still seeing issues.
We have a producer which sends messages to kafka in format
.

Below is the code snippet, we have used at Beam KafkaIo.

 org.apache.avro.Schema schema = null;
try {
schema = new org.apache.avro.Schema.Parser().parse(new
File("Schema path"));
} catch (Exception e) {
e.printStackTrace();
}
KafkaIO.Read kafkaIoRead = KafkaIO.read()

.withBootstrapServers(bootstrapServerUrl).withTopic(topicName)
.withKeyDeserializer(StringDeserializer.class)

.withValueDeserializerAndCoder(GenericAvroDeserializer.class,
AvroCoder.of(schema))

.updateConsumerProperties(ImmutableMap.of("schema.registry.url", schemaUrl))
.withTimestampPolicyFactory((tp, prevWatermark) -> new
KafkaCustomTimestampPolicy(maxDelay,
timestampInfo, prevWatermark));

Below is the error seen,

Caused by:
avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException:
org.apache.avro.AvroRuntimeException: Not a Specific class: interface
org.apache.avro.generic.GenericRecord
at
avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2234)
at
avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965)
at
avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969)
at
avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829)
at
org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225)
... 8 more
Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class:
interface org.apache.avro.generic.GenericRecord
at
org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285)
at
org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
at
org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
at
org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
at
avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
at
avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
at
avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
at
avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)


Can you provide some pointers on this.


*Thanks & Regards,*

*Vishwas *



On Fri, Sep 28, 2018 at 3:12 AM Raghu Angadi  wrote:

> It is a compilation error due to type mismatch for value type.
>
> Please match key and value types for KafkaIO reader. I.e. if you have
> KafkaIO.read().,  'withValueDeserializer()' needs a
> class object which extends 'Deserializer'. Since
> KafkaAvroDeserializer extends 'Deserializer', so your ValueType
> needs to be Object, instead of String.
>
> Btw, it might be better to use GenericAvroDeseiralizer or
> SpecificAvroDeserializer from the same package.
>
>
> On Thu, Sep 27, 2018 at 10:31 AM Vishwas Bm  wrote:
>
>>
>> Hi Raghu,
>>
>> The deserializer is provided by confluent
>> *io.confluent.kafka.serializers* package.
>>
>> When we set valueDeserializer as  KafkaAvroDeserializer.  We are getting
>> below error:
>>The method withValueDeserializer(Class> Deserializer>) in the type KafkaIO.Read is not
>> applicable for the arguments
>>  (Class)
>>
>> From the error, it looks like beam does not support this deserializer.
>> Also we wanted to use schemaRegistry from confluent, is this supported in
>> Beam ?
>>
>>
>> *Thanks & Regards,*
>> *Vishwas *
>>
>>
>> On Thu, Sep 27, 2018 at 10:28 PM Raghu Angadi  wrote:
>>
>>> You can set key/value deserializers :
>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L101
>>> What are the errors you see?
>>>
>>> Also note that Beam includes AvroCoder for handling Avro records in Beam.
>>>
>>> On Thu, Sep 27, 2018 at 6:05 AM rahul patwari <
>>> rahulpatwari8...@gmail.com> wrote:
>>>
 Hi,

 We have a usecase to read data from Kafka serialized with
 KafkaAvroSerializer and schema is present in Schema Registry.

 When we are trying to use ValueDeserializer as
 io.confluent.kafka.serializers.KafkaAvroDeserializer to get GenericRecord,
 we are seeing errors.

 Does KafkaIO.read() supports reading from schema registry and using
 confluent KafkaAvroSerDe?

 Regards,
 Rahul

>>>


Re: Agenda for the Beam Summit London 2018

2018-09-28 Thread Rose Nguyen
Wow, this looks fantastic! Thanks to the organizers!

On Thu, Sep 27, 2018 at 11:29 PM Andrew Psaltis 
wrote:

> This is great. Any chance it will be recorded or at a minimum the slides
> made available after. Unfortunately, I won't be able to make it to London
> next week.
>
> Best,
> Andrew
>
> On Fri, Sep 28, 2018 at 10:11 AM Pablo Estrada  wrote:
>
>> Very exciting. I will have to miss it, but I'm excited to see what comes
>> out of it:)
>> Thanks to Gris, Matthias and other organizers.
>> Best
>> -P.
>>
>> On Thu, Sep 27, 2018, 4:26 PM Jean-Baptiste Onofré 
>> wrote:
>>
>>> Great !! Thanks Gris.
>>>
>>> Looking forward to see you all next Monday in London.
>>>
>>> Regards
>>>
>>> JB
>>> Le 27 sept. 2018, à 18:03, Griselda Cuevas  a écrit:

 Hi Beam Community,

 We have finalized the agenda for the Beam Summit London 2018, it's
 here:
 https://www.linkedin.com/feed/update/urn:li:activity:6450125487321735168
 /


 We had a great amount of talk proposals, thank you so much to everyone
 who submitted one! We also sold out the event, so we're very excited to see
 the community growing.


 See you around,

 Gris on behalf of the Organizing Committee

>>>

-- 
Rose Thị Nguyễn


Re: Advice for piping many CSVs with different columns names to one bigQuery table

2018-09-28 Thread Ziyad Muhammed
Hi Eila,

I'm not sure if I understand the complexity of your problem.
If you do not have to perform any transformation on the data inside CSVs
and just need to load them to Bigquery, isn't it enough to use bqload with
schema autodetect ?
https://cloud.google.com/bigquery/docs/schema-detect

Best
Ziyad


On Thu, Sep 27, 2018 at 9:35 PM OrielResearch Eila Arich-Landkof <
e...@orielresearch.org> wrote:

> Thank you!
> Probably around 50.
>
> Best,
> Eila
>
> On Thu, Sep 27, 2018 at 1:23 AM Ankur Goenka  wrote:
>
>> Hi Eila,
>>
>> That seems reasonable to me.
>>
>> Here is a reference on writing to BQ
>> https://github.com/apache/beam/blob/1ffba44f7459307f5a134b8f4ea47ddc5ca8affc/sdks/python/apache_beam/examples/complete/game/leader_board.py#L326
>>
>> May I know how many distinct column are you expecting across all files?
>>
>>
>> On Wed, Sep 26, 2018 at 8:06 PM OrielResearch Eila Arich-Landkof <
>> e...@orielresearch.org> wrote:
>>
>>> Hi Ankur / users,
>>>
>>> I would like to make sure that the suggested pipeline can work for my
>>> needs.
>>>
>>> So, additional clarification:
>>> - The CSV files have few common and few different columns. Each CSV file
>>> represent a sample measurements record.
>>>
>>> - When the CSVs merged together, I expect to have one table with
>>> combined columns from all samples. Will your suggested pipeline allow that?
>>>
>>> - If I understand correctly the following pipeline:
>>>
>>> *Read files => Parse lines => Generate pCollections for each column =>
>>> GroupBy column name => Write to BQ*
>>>
>>> *Read files:* will generate a pCollection from each CSV line (=file)
>>> *Parse Lines*: will extract the column name and its matching value
>>> (generate a tuple)
>>> *Generate pCollection for each column:* will generate a pCollection
>>> from the tuple
>>> *GroupBy: *will merge each column name with all the relevant samples
>>> values (does it need to know the column names to group by or will it
>>> automatically use the tuple key? )
>>> *WriteToBQ*: will NOT (is that true?) be able to write the values
>>> matching the relevant sample. The samples that didnt have value for a
>>> specific key will get values for other samples values that are available at
>>> the column's collection record
>>>
>>> - If the above understanding correct, will extracting & merging the
>>> headers from all 2.5M CSV files in advance to derived the schema and then
>>> using an additional pipeline to populate the data to BQ with schema will be
>>> the "right" approach?
>>>
>>> Please let me know if I miss something here / what your thoughts are
>>> Many thanks,
>>> Eila
>>>
>>> On Wed, Sep 26, 2018 at 12:04 PM OrielResearch Eila Arich-Landkof <
>>> e...@orielresearch.org> wrote:
>>>
 Hi Ankur,

 Thank you. Trying this approach now. Will let you know if I have any
 issue implementing it.
 Best,
 Eila

 On Tue, Sep 25, 2018 at 7:19 PM Ankur Goenka  wrote:

> Hi Eila,
>
> If I understand correctly, the objective is to read a large number of
> CSV files, each of which contains a single row with multiple columns.
> Deduplicate the columns in the file and write them to BQ.
> You are using pandas DF to deduplicate the columns for a small set of
> files which might not work for large number of files.
>
> You can use beam groupBy to deduplicate the columns and write them to
> bigquery. Beam is capable of reading and managing large number of files by
> providing path to the directory containing those files.
> So the approach would be ->
> Read files => Parse lines => Generate pCollections for each column =>
> GroupBy column name => Write to BQ
> For reference here is an example of reading file and doing groupby
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount.py
>
> Note: I am not very familiar with BQ so can't think of any direct
> approach to dump data to BQ.
>
> Thanks,
> Ankur
>
>
> On Tue, Sep 25, 2018 at 12:13 PM OrielResearch Eila Arich-Landkof <
> e...@orielresearch.org> wrote:
>
>> Hello,
>> I would like to write large number of CSV file to BQ where the
>> headers from all of them is aggregated to one common headers. any advice 
>> is
>> very appreciated.
>>
>> The details are:
>> 1. 2.5M CSV files
>> 2. Each CSV file: header of 50-60 columns
>> 2. Each CSV file: one data row
>>
>> there are common columns between the CSV file but I dont know them in
>> advance.I would like to have all the csv files in one bigQuery table.
>>
>> My current method:
>> When it was smaller amount of files, I read the csv files and
>> appended them to one pandas dataframe that was written to a file
>> (total.csv). total.csv was the input to the beam pipeline.
>>
>> small CSVs => Pandas DF => total CSV => pCollection => Big Query
>>
>> The challenge with that