[jira] [Commented] (BEAM-1573) KafkaIO does not allow using Kafka serializers and deserializers

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15985612#comment-15985612
 ] 

ASF GitHub Bot commented on BEAM-1573:
--

Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/2330


> KafkaIO does not allow using Kafka serializers and deserializers
> 
>
> Key: BEAM-1573
> URL: https://issues.apache.org/jira/browse/BEAM-1573
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Affects Versions: 0.4.0, 0.5.0
>Reporter: peay
>Assignee: Raghu Angadi
>Priority: Minor
> Fix For: First stable release
>
>
> KafkaIO does not allow to override the serializer and deserializer settings 
> of the Kafka consumer and producers it uses internally. Instead, it allows to 
> set a `Coder`, and has a simple Kafka serializer/deserializer wrapper class 
> that calls the coder.
> I appreciate that allowing to use Beam coders is good and consistent with the 
> rest of the system. However, is there a reason to completely disallow to use 
> custom Kafka serializers instead?
> This is a limitation when working with an Avro schema registry for instance, 
> which requires custom serializers. One can write a `Coder` that wraps a 
> custom Kafka serializer, but that means two levels of un-necessary wrapping.
> In addition, the `Coder` abstraction is not equivalent to Kafka's 
> `Serializer` which gets the topic name as input. Using a `Coder` wrapper 
> would require duplicating the output topic setting in the argument to 
> `KafkaIO` and when building the wrapper, which is not elegant and error prone.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1573) KafkaIO does not allow using Kafka serializers and deserializers

2017-03-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15942302#comment-15942302
 ] 

ASF GitHub Bot commented on BEAM-1573:
--

GitHub user peay opened a pull request:

https://github.com/apache/beam/pull/2330

[BEAM-1573] Use Kafka serializers instead of coders in KafkaIO

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [x] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [x] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---



**This is a work in progress, do not merge**.

Modified:
- Use Kafka serializers and deserializers in KafkaIO
- Added helper methods `fromAvro` and `toAvro`, to use serialization based 
on `AvroCoder`. This is uniform with other IO such as HDFS.
- Moved `CoderBasedKafkaSerializer` out, and added 
`CoderBaseKafkaDeserializer`. These are used for `toAvro/fromAvro`, and can be 
useful to port existing code that relies on coder.
- Added `InstantSerializer` and `InstantDeserializer`, as `Instant` is used 
in some of the tests.
 
Writer lets Kafka handle serialization itself. Reader uses Kafka byte 
deserializers, and calls the user-provided Kafka deserializer from `advance`. 
Note that Kafka serializers and deserializers are not themselves 
`Serializable`. Hence, I've used a `Class<..>` in the `spec` both for read and 
write.

There is still an issue, though. `Read` still takes **both a deserializer 
and a coder**. This is because the source must implement 
`getDefaultOutputCoder`, and I am not sure how to infer it. Having to provide 
the two is heavy, but I am not sure how to infer the coders in this context. 
Any thoughts?

cc @rangadi @jkff


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/peay/beam BEAM-1573

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/2330.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2330


commit 511a1301190b08b05573e3025d7ade2746d61e5f
Author: peay 
Date:   2017-03-26T14:51:59Z

[BEAM-1573] Use Kafka serializers instead of coders in KafkaIO




> KafkaIO does not allow using Kafka serializers and deserializers
> 
>
> Key: BEAM-1573
> URL: https://issues.apache.org/jira/browse/BEAM-1573
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Affects Versions: 0.4.0, 0.5.0
>Reporter: peay
>Assignee: Raghu Angadi
>Priority: Minor
> Fix For: Not applicable
>
>
> KafkaIO does not allow to override the serializer and deserializer settings 
> of the Kafka consumer and producers it uses internally. Instead, it allows to 
> set a `Coder`, and has a simple Kafka serializer/deserializer wrapper class 
> that calls the coder.
> I appreciate that allowing to use Beam coders is good and consistent with the 
> rest of the system. However, is there a reason to completely disallow to use 
> custom Kafka serializers instead?
> This is a limitation when working with an Avro schema registry for instance, 
> which requires custom serializers. One can write a `Coder` that wraps a 
> custom Kafka serializer, but that means two levels of un-necessary wrapping.
> In addition, the `Coder` abstraction is not equivalent to Kafka's 
> `Serializer` which gets the topic name as input. Using a `Coder` wrapper 
> would require duplicating the output topic setting in the argument to 
> `KafkaIO` and when building the wrapper, which is not elegant and error prone.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1573) KafkaIO does not allow using Kafka serializers and deserializers

2017-03-13 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15923595#comment-15923595
 ] 

Raghu Angadi commented on BEAM-1573:


Thanks [~peay]. Please go ahead a send github PR, I does not need to be ready, 
partial implementation is also fine. Just mention that it is not ready.

A related issue is how we want to handle deserialization errors. Looks like 
KafkaConsumer.poll() throws KafkaException in case of derealization errors. So 
we don't know which record results in this error either. What if a runner want 
to skip the record (or a bundle) that causes such errors (not sure if any of 
the runners does this). This is not deal breaker, it is fine as long as we 
properly propagate the exception to user.

One option is to invoke the deserializer ourself inside advance(). This give 
full control on how we want to handle deserialization errors.
Another option to keep track of serialized size : We could have a wrapper 
deserializer class that stores the byte size and invokes user's deserializer. 
This also gives control on how we want to handle the exceptions.


> KafkaIO does not allow using Kafka serializers and deserializers
> 
>
> Key: BEAM-1573
> URL: https://issues.apache.org/jira/browse/BEAM-1573
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Affects Versions: 0.4.0, 0.5.0
>Reporter: peay
>Assignee: Raghu Angadi
>Priority: Minor
> Fix For: Not applicable
>
>
> KafkaIO does not allow to override the serializer and deserializer settings 
> of the Kafka consumer and producers it uses internally. Instead, it allows to 
> set a `Coder`, and has a simple Kafka serializer/deserializer wrapper class 
> that calls the coder.
> I appreciate that allowing to use Beam coders is good and consistent with the 
> rest of the system. However, is there a reason to completely disallow to use 
> custom Kafka serializers instead?
> This is a limitation when working with an Avro schema registry for instance, 
> which requires custom serializers. One can write a `Coder` that wraps a 
> custom Kafka serializer, but that means two levels of un-necessary wrapping.
> In addition, the `Coder` abstraction is not equivalent to Kafka's 
> `Serializer` which gets the topic name as input. Using a `Coder` wrapper 
> would require duplicating the output topic setting in the argument to 
> `KafkaIO` and when building the wrapper, which is not elegant and error prone.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1573) KafkaIO does not allow using Kafka serializers and deserializers

2017-03-12 Thread peay (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15906595#comment-15906595
 ] 

peay commented on BEAM-1573:


[~rangadi] I have made some good progress, with running the deserializer within 
Kafka on the consumer thread. An issue with this is how to implement 
`approxBacklogInBytes`. The current implementation actually deserializes 
manually (i.e., outside Kafka and the consumer thread) in `advance`. The 0.10 
API has {{ConsumeRecord.serializedKeySize()}} and 
{{ConsumerRecord.serializedValueSize()}} which would be perfect to handle that 
computation, but the 0.9 API does not have it. What would you suggest?

> KafkaIO does not allow using Kafka serializers and deserializers
> 
>
> Key: BEAM-1573
> URL: https://issues.apache.org/jira/browse/BEAM-1573
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Affects Versions: 0.4.0, 0.5.0
>Reporter: peay
>Assignee: Raghu Angadi
>Priority: Minor
> Fix For: Not applicable
>
>
> KafkaIO does not allow to override the serializer and deserializer settings 
> of the Kafka consumer and producers it uses internally. Instead, it allows to 
> set a `Coder`, and has a simple Kafka serializer/deserializer wrapper class 
> that calls the coder.
> I appreciate that allowing to use Beam coders is good and consistent with the 
> rest of the system. However, is there a reason to completely disallow to use 
> custom Kafka serializers instead?
> This is a limitation when working with an Avro schema registry for instance, 
> which requires custom serializers. One can write a `Coder` that wraps a 
> custom Kafka serializer, but that means two levels of un-necessary wrapping.
> In addition, the `Coder` abstraction is not equivalent to Kafka's 
> `Serializer` which gets the topic name as input. Using a `Coder` wrapper 
> would require duplicating the output topic setting in the argument to 
> `KafkaIO` and when building the wrapper, which is not elegant and error prone.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1573) KafkaIO does not allow using Kafka serializers and deserializers

2017-03-08 Thread peay (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15901275#comment-15901275
 ] 

peay commented on BEAM-1573:


OK, I will get started on a PR.

> KafkaIO does not allow using Kafka serializers and deserializers
> 
>
> Key: BEAM-1573
> URL: https://issues.apache.org/jira/browse/BEAM-1573
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Affects Versions: 0.4.0, 0.5.0
>Reporter: peay
>Assignee: Raghu Angadi
>Priority: Minor
> Fix For: Not applicable
>
>
> KafkaIO does not allow to override the serializer and deserializer settings 
> of the Kafka consumer and producers it uses internally. Instead, it allows to 
> set a `Coder`, and has a simple Kafka serializer/deserializer wrapper class 
> that calls the coder.
> I appreciate that allowing to use Beam coders is good and consistent with the 
> rest of the system. However, is there a reason to completely disallow to use 
> custom Kafka serializers instead?
> This is a limitation when working with an Avro schema registry for instance, 
> which requires custom serializers. One can write a `Coder` that wraps a 
> custom Kafka serializer, but that means two levels of un-necessary wrapping.
> In addition, the `Coder` abstraction is not equivalent to Kafka's 
> `Serializer` which gets the topic name as input. Using a `Coder` wrapper 
> would require duplicating the output topic setting in the argument to 
> `KafkaIO` and when building the wrapper, which is not elegant and error prone.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1573) KafkaIO does not allow using Kafka serializers and deserializers

2017-03-06 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897824#comment-15897824
 ] 

Raghu Angadi commented on BEAM-1573:


New API ({{withKeySerializer() & withValueSerializer()}}) sounds good. We can 
mark the old API deprecated and also provide Coder based Kafka Serializer and 
Deserializer for if users still want to use the coders (say for transition).

Implementation wise, note that Kafka deserializer would run on Kafka consumer 
thread, which is outside normal 'advance()' (invoked by the runner on the 
reader). That implies we need to propagate serialization errors appropriately 
and throw them in advance(). Alternately we could invoke deserializer 
explicitly inside advance() rather than 'consumer poll thread', not sure if 
there are any drawbacks to that.

[~peay] PR will be useful.

> KafkaIO does not allow using Kafka serializers and deserializers
> 
>
> Key: BEAM-1573
> URL: https://issues.apache.org/jira/browse/BEAM-1573
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Affects Versions: 0.4.0, 0.5.0
>Reporter: peay
>Assignee: Raghu Angadi
>Priority: Minor
>
> KafkaIO does not allow to override the serializer and deserializer settings 
> of the Kafka consumer and producers it uses internally. Instead, it allows to 
> set a `Coder`, and has a simple Kafka serializer/deserializer wrapper class 
> that calls the coder.
> I appreciate that allowing to use Beam coders is good and consistent with the 
> rest of the system. However, is there a reason to completely disallow to use 
> custom Kafka serializers instead?
> This is a limitation when working with an Avro schema registry for instance, 
> which requires custom serializers. One can write a `Coder` that wraps a 
> custom Kafka serializer, but that means two levels of un-necessary wrapping.
> In addition, the `Coder` abstraction is not equivalent to Kafka's 
> `Serializer` which gets the topic name as input. Using a `Coder` wrapper 
> would require duplicating the output topic setting in the argument to 
> `KafkaIO` and when building the wrapper, which is not elegant and error prone.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1573) KafkaIO does not allow using Kafka serializers and deserializers

2017-03-06 Thread Eugene Kirpichov (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1589#comment-1589
 ] 

Eugene Kirpichov commented on BEAM-1573:


Yes, this would be a backward-incompatible change. We are sort of fine with 
making them if there is a good reason and before declaring Beam stable API 
(first 1.x release), and this would be one of a family of changes bringing beam 
in accordance with its own style guide - which we consider a necessary evil. We 
already removed coders from TextIO as part of that.

So yes, please feel free to proceed, and I'll be happy to review the PR.

> KafkaIO does not allow using Kafka serializers and deserializers
> 
>
> Key: BEAM-1573
> URL: https://issues.apache.org/jira/browse/BEAM-1573
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Affects Versions: 0.4.0, 0.5.0
>Reporter: peay
>Assignee: Raghu Angadi
>Priority: Minor
>
> KafkaIO does not allow to override the serializer and deserializer settings 
> of the Kafka consumer and producers it uses internally. Instead, it allows to 
> set a `Coder`, and has a simple Kafka serializer/deserializer wrapper class 
> that calls the coder.
> I appreciate that allowing to use Beam coders is good and consistent with the 
> rest of the system. However, is there a reason to completely disallow to use 
> custom Kafka serializers instead?
> This is a limitation when working with an Avro schema registry for instance, 
> which requires custom serializers. One can write a `Coder` that wraps a 
> custom Kafka serializer, but that means two levels of un-necessary wrapping.
> In addition, the `Coder` abstraction is not equivalent to Kafka's 
> `Serializer` which gets the topic name as input. Using a `Coder` wrapper 
> would require duplicating the output topic setting in the argument to 
> `KafkaIO` and when building the wrapper, which is not elegant and error prone.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1573) KafkaIO does not allow using Kafka serializers and deserializers

2017-03-06 Thread peay (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897752#comment-15897752
 ] 

peay commented on BEAM-1573:


[~rangadi] OK, that makes sense. I was hoping to try and keep everything as a 
single step, for instance to be able to leverage {code}withTimestampFn{code}, 
but I'll go with (1) for now.

If the long term plan is to remove the use of coders in read/write and allow to 
pass in Kafka serializers directly, this was my original point, so all the 
better. I am happy to work on a PR for that if you want me to. I think 
{code}KafkaIO{code} can still provide a typed reader/writer with 
{code}withCustomKafkaValueSerializer{code} like methods, to avoid the 
extraneous ParDo and having to call a utility to get something else than 
`byte[]`, which is assume is often going to be the case.

The main issue I see is that removing {code}withValueCoder{code} and so on will 
break API compatibility, not sure what the project's policy is on that [~jkff]? 
A deprecation phase of a couple releases, and then breaking changes?

> KafkaIO does not allow using Kafka serializers and deserializers
> 
>
> Key: BEAM-1573
> URL: https://issues.apache.org/jira/browse/BEAM-1573
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Affects Versions: 0.4.0, 0.5.0
>Reporter: peay
>Assignee: Raghu Angadi
>Priority: Minor
>
> KafkaIO does not allow to override the serializer and deserializer settings 
> of the Kafka consumer and producers it uses internally. Instead, it allows to 
> set a `Coder`, and has a simple Kafka serializer/deserializer wrapper class 
> that calls the coder.
> I appreciate that allowing to use Beam coders is good and consistent with the 
> rest of the system. However, is there a reason to completely disallow to use 
> custom Kafka serializers instead?
> This is a limitation when working with an Avro schema registry for instance, 
> which requires custom serializers. One can write a `Coder` that wraps a 
> custom Kafka serializer, but that means two levels of un-necessary wrapping.
> In addition, the `Coder` abstraction is not equivalent to Kafka's 
> `Serializer` which gets the topic name as input. Using a `Coder` wrapper 
> would require duplicating the output topic setting in the argument to 
> `KafkaIO` and when building the wrapper, which is not elegant and error prone.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1573) KafkaIO does not allow using Kafka serializers and deserializers

2017-03-06 Thread Eugene Kirpichov (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896989#comment-15896989
 ] 

Eugene Kirpichov commented on BEAM-1573:


At a conference right now, but quick comment: yes, as Raghu said, we're getting 
rid of Coder's as a general parsing mechanism. Use of coders for the purpose 
for which KafkaIO currently uses them is explicitly forbidden by the Beam 
PTransform Style Guide 
https://beam.apache.org/contribute/ptransform-style-guide/#coders .

We should replace that with having KafkaIO return byte[] and having convenience 
utilities for deserializing these byte[] using Kafka deserializers, e.g. by 
wrapping the code Raghu posted as a utility in the kafka module (packaged, say, 
as a SerializableFunction).

Raghu or @peay, perhaps consider sending a PR to fix this? It seems like it 
should be rather easy. Though it would merit a short discussion on 
d...@beam.apache.org first.

> KafkaIO does not allow using Kafka serializers and deserializers
> 
>
> Key: BEAM-1573
> URL: https://issues.apache.org/jira/browse/BEAM-1573
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Affects Versions: 0.4.0, 0.5.0
>Reporter: peay
>Assignee: Raghu Angadi
>Priority: Minor
>
> KafkaIO does not allow to override the serializer and deserializer settings 
> of the Kafka consumer and producers it uses internally. Instead, it allows to 
> set a `Coder`, and has a simple Kafka serializer/deserializer wrapper class 
> that calls the coder.
> I appreciate that allowing to use Beam coders is good and consistent with the 
> rest of the system. However, is there a reason to completely disallow to use 
> custom Kafka serializers instead?
> This is a limitation when working with an Avro schema registry for instance, 
> which requires custom serializers. One can write a `Coder` that wraps a 
> custom Kafka serializer, but that means two levels of un-necessary wrapping.
> In addition, the `Coder` abstraction is not equivalent to Kafka's 
> `Serializer` which gets the topic name as input. Using a `Coder` wrapper 
> would require duplicating the output topic setting in the argument to 
> `KafkaIO` and when building the wrapper, which is not elegant and error prone.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1573) KafkaIO does not allow using Kafka serializers and deserializers

2017-03-06 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896948#comment-15896948
 ] 

Raghu Angadi commented on BEAM-1573:


@peay, 
There are two levels of solutions to deserializer (and serializer): 
  # Reasonable ways to use of custom Kafka deserializers & serializers
* This is very feasible now, including the case when you are reading from 
multiple topics.
  # Update to KafkaIO API to pass Kafka serializers directly to the Kafka 
consumer.
 * We might end up doing this, not exactly how you proposed, but rather 
replacing coders with Kafka (de)serializers. There is no need to include both I 
think. 
 * There is a discussion on Beam mailing lists about removing use of coders 
directly in sources and other places and that might be right time to add this 
support. (cc [~jkff])

Are you more interested 1 or 2? 

One way to use any Kafka serializer (for (1)): 
{code}
PCollection kafkaRecords = // Note that KafkaRecord 
include topic name, partition etc.
 pipeline
.apply(KafkaIO.read()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopics(ImmutableList.of("topic_a")));

kafkaRecords.apply( ParDo.of(new DoFn, 
MyAvroRecord) {
 
   private final Map config = // config 
   private transient Deserializer kafkaDeserializer;
   @Setup
   public void setup() {
  kafkaDeserializer = new MyDeserializer();
 kafkaDeserializer.configure(config) // kafka config (serializable map)
}

   @ProcessElement
public void procesElement(Context context) {
   MyAvroRecord record = 
kafkaDeserializer.deserialize(context.element().getTopic(), 
context.element().getValue())
   context.output(record);
   }
 
   @TearDown
   public void tearDown() {
 kafkaDeserializer.close();
   }
}))

{code}

   

> KafkaIO does not allow using Kafka serializers and deserializers
> 
>
> Key: BEAM-1573
> URL: https://issues.apache.org/jira/browse/BEAM-1573
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Affects Versions: 0.4.0, 0.5.0
>Reporter: peay
>Assignee: Raghu Angadi
>Priority: Minor
>
> KafkaIO does not allow to override the serializer and deserializer settings 
> of the Kafka consumer and producers it uses internally. Instead, it allows to 
> set a `Coder`, and has a simple Kafka serializer/deserializer wrapper class 
> that calls the coder.
> I appreciate that allowing to use Beam coders is good and consistent with the 
> rest of the system. However, is there a reason to completely disallow to use 
> custom Kafka serializers instead?
> This is a limitation when working with an Avro schema registry for instance, 
> which requires custom serializers. One can write a `Coder` that wraps a 
> custom Kafka serializer, but that means two levels of un-necessary wrapping.
> In addition, the `Coder` abstraction is not equivalent to Kafka's 
> `Serializer` which gets the topic name as input. Using a `Coder` wrapper 
> would require duplicating the output topic setting in the argument to 
> `KafkaIO` and when building the wrapper, which is not elegant and error prone.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1573) KafkaIO does not allow using Kafka serializers and deserializers

2017-03-02 Thread peay (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15892746#comment-15892746
 ] 

peay commented on BEAM-1573:


My concern is for both source and sink.

I'd like to be able to use custom 
{{org.apache.kafka.common.serialization.Serializer,Deserializer}}s. An example 
is 
http://docs.confluent.io/2.0.0/schema-registry/docs/serializer-formatter.html#serializer
 for working with setups where Kafka topics contain Avro serialized using an 
Avro schema registry. This uses a {{Serializer/Deserializer}} but I 
also have similar Kafka serializers with arbitrary types.

The interfaces of the encoding/decoding methods in 
{{org.apache.kafka.common.serialization.Serializer,Deserializer}} are:
- {{serialize(String topic, byte[] data)}}
- {{deserialize(String topic, byte[] data)}}.

I would like to be able to support a syntax like this:
{code} 
KafkaIO
.read()

.withBootstrapServers(this.broker)

.withTopics(ImmutableList.of(this.topic))

.withCustomKafkaValueDeserializerAndCoder(new SomeCustomKafkaDeserializer(), 
AvroCoder.of(xxx))

.withCustomKafkaKeyDeserializerAndCoder(new SomeCustomKafkaDeserializer()), 
AvroCoder.of(xxx))

KafkaIO

.write()

.withBootstrapServers(this.broker)

.withTopic(this.topic)

.withCustomKafkaValueSerializer(new SomeCustomDeserializer())

.withCustomKafkaKeySerializer(new SomeCustomDeserializer()))
{code}

In both case, Kafka would use the custom serializer/deserializer directly.

Now, why is it hard to express currently? KafkaIO seems to be implemented 
differently for read and write, so let us consider the two cases. I have a 
working patch for the above syntax, that is straightforward for writes, but 
requires a bunch of changes for reads...

For write, the Coder is wrapped into an actual 
{{org.apache.kafka.common.serialization.Serializer}} through 
{{CoderBasedKafkaSerializer}}. I can make a {{CustomCoder}}, but still have to 
pass it manually the topic name. Also, we end up with a wrapper for a Kafka 
serializer, wrapped in a Coder, itself wrapped in a Kafka serializer. 

Reads are implemented differently. I am not sure why? Instead of wrapping the 
coders into a Kafka deserializer, everything is hard wired to use `byte[]` 
Kafka consumer. Then, KakfaIO calls the coder after data has been returned by 
the consumer. Here also, one can make a {{CustomCoder}}. This won't work if a 
list of topics is used as input to KafkaIO, and still requires to pass in the 
topic name manually when there's only here. In the example snippet above, I 
also include a second argument that is a coder, to use with {{setCoder}} for 
setting up the rest of the pipeline.

In both cases, wrapping the Kafka serializer into the Coder is also not obvious 
because Kafka serializers have a {{configure}} method to give them access to 
the consumer/producer config, so this possibly needs to be emulated in the 
coder wrapper.

What do you think? 

> KafkaIO does not allow using Kafka serializers and deserializers
> 
>
> Key: BEAM-1573
> URL: https://issues.apache.org/jira/browse/BEAM-1573
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Affects Versions: 0.4.0, 0.5.0
>Reporter: peay
>Assignee: Raghu Angadi
>Priority: Minor
>
> KafkaIO does not allow to override the serializer and deserializer settings 
> of the Kafka consumer and producers it uses internally. Instead, it allows to 
> set a `Coder`, and has a simple Kafka serializer/deserializer wrapper class 
> that calls the coder.
> I appreciate that allowing to use Beam coders is good and consistent with the 
> rest of the system. However, is there a reason to completely disallow to use 
> custom Kafka serializers instead?
> This is a limitation when working with an Avro schema registry for instance, 
> which requires custom serializers. One can write a `Coder` that wraps a 
> custom Kafka serializer, but that means two levels of un-necessary wrapping.
> In addition, the `Coder` abstraction is not equivalent to Kafka's 
> `Serializer` which gets the topic name as input. Using a `Coder` wrapper 
> would require duplicating the output topic setting in the argument to 
> `KafkaIO` and when building the wrapper, which is not elegant and error prone.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1573) KafkaIO does not allow using Kafka serializers and deserializers

2017-02-28 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15889471#comment-15889471
 ] 

Raghu Angadi commented on BEAM-1573:


[~peay], it is not clear what you are trying to do that is not possible now. 
Can you paste (psuedo) code for either Beam pipeline (or even simple 
{{consume(KafkaRecord record)}} method) to illustrate what you want to do? Btw, 
is this for source or sink?

Note that if you don't do '.withoutMetaData()' on KafkaIO.read(), it return 
KafkaRecord, which does contain topic, partition and other metadata.



> KafkaIO does not allow using Kafka serializers and deserializers
> 
>
> Key: BEAM-1573
> URL: https://issues.apache.org/jira/browse/BEAM-1573
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Affects Versions: 0.4.0, 0.5.0
>Reporter: peay
>Assignee: Raghu Angadi
>Priority: Minor
>
> KafkaIO does not allow to override the serializer and deserializer settings 
> of the Kafka consumer and producers it uses internally. Instead, it allows to 
> set a `Coder`, and has a simple Kafka serializer/deserializer wrapper class 
> that calls the coder.
> I appreciate that allowing to use Beam coders is good and consistent with the 
> rest of the system. However, is there a reason to completely disallow to use 
> custom Kafka serializers instead?
> This is a limitation when working with an Avro schema registry for instance, 
> which requires custom serializers. One can write a `Coder` that wraps a 
> custom Kafka serializer, but that means two levels of un-necessary wrapping.
> In addition, the `Coder` abstraction is not equivalent to Kafka's 
> `Serializer` which gets the topic name as input. Using a `Coder` wrapper 
> would require duplicating the output topic setting in the argument to 
> `KafkaIO` and when building the wrapper, which is not elegant and error prone.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1573) KafkaIO does not allow using Kafka serializers and deserializers

2017-02-28 Thread peay (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15888717#comment-15888717
 ] 

peay commented on BEAM-1573:


Actually, thinking more about it, just setting the serializer class in the 
Kafka properties doesn't work in terms of getting a `Write` with the correct 
key/value types.

Maybe we can instead have a method along the lines of

```
public  Write 
withCustomKafkaValueSerializer(Serializer serializer) {
  Map configUpdates = 
ImmutableMap.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  
serializer.getClass());
  Map config = updateKafkaProperties(producerConfig,
  TypedWrite.IGNORED_PRODUCER_PROPERTIES, configUpdates);
  
  return new Write(topic, keyCoder, null, config);
}
```


> KafkaIO does not allow using Kafka serializers and deserializers
> 
>
> Key: BEAM-1573
> URL: https://issues.apache.org/jira/browse/BEAM-1573
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Affects Versions: 0.4.0, 0.5.0
>Reporter: peay
>Assignee: Davor Bonaci
>Priority: Minor
>
> KafkaIO does not allow to override the serializer and deserializer settings 
> of the Kafka consumer and producers it uses internally. Instead, it allows to 
> set a `Coder`, and has a simple Kafka serializer/deserializer wrapper class 
> that calls the coder.
> I appreciate that allowing to use Beam coders is good and consistent with the 
> rest of the system. However, is there a reason to completely disallow to use 
> custom Kafka serializers instead?
> This is a limitation when working with an Avro schema registry for instance, 
> which requires custom serializers. One can write a `Coder` that wraps a 
> custom Kafka serializer, but that means two levels of un-necessary wrapping.
> In addition, the `Coder` abstraction is not equivalent to Kafka's 
> `Serializer` which gets the topic name as input. Using a `Coder` wrapper 
> would require duplicating the output topic setting in the argument to 
> `KafkaIO` and when building the wrapper, which is not elegant and error prone.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1573) KafkaIO does not allow using Kafka serializers and deserializers

2017-02-28 Thread peay (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15888481#comment-15888481
 ] 

peay commented on BEAM-1573:


Yes, if we can agree on a solution. I would suggest to just remove the ignore 
on the two parameters to set custom Kafka serializers/deserializers, but there 
may be some implications I am not seeing for why this was put in the first 
place?

> KafkaIO does not allow using Kafka serializers and deserializers
> 
>
> Key: BEAM-1573
> URL: https://issues.apache.org/jira/browse/BEAM-1573
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Affects Versions: 0.4.0, 0.5.0
>Reporter: peay
>Assignee: Davor Bonaci
>Priority: Minor
>
> KafkaIO does not allow to override the serializer and deserializer settings 
> of the Kafka consumer and producers it uses internally. Instead, it allows to 
> set a `Coder`, and has a simple Kafka serializer/deserializer wrapper class 
> that calls the coder.
> I appreciate that allowing to use Beam coders is good and consistent with the 
> rest of the system. However, is there a reason to completely disallow to use 
> custom Kafka serializers instead?
> This is a limitation when working with an Avro schema registry for instance, 
> which requires custom serializers. One can write a `Coder` that wraps a 
> custom Kafka serializer, but that means two levels of un-necessary wrapping.
> In addition, the `Coder` abstraction is not equivalent to Kafka's 
> `Serializer` which gets the topic name as input. Using a `Coder` wrapper 
> would require duplicating the output topic setting in the argument to 
> `KafkaIO` and when building the wrapper, which is not elegant and error prone.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1573) KafkaIO does not allow using Kafka serializers and deserializers

2017-02-28 Thread Davor Bonaci (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15888476#comment-15888476
 ] 

Davor Bonaci commented on BEAM-1573:


[~rangadi], any comments?

> KafkaIO does not allow using Kafka serializers and deserializers
> 
>
> Key: BEAM-1573
> URL: https://issues.apache.org/jira/browse/BEAM-1573
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Affects Versions: 0.4.0, 0.5.0
>Reporter: peay
>Assignee: Davor Bonaci
>Priority: Minor
>
> KafkaIO does not allow to override the serializer and deserializer settings 
> of the Kafka consumer and producers it uses internally. Instead, it allows to 
> set a `Coder`, and has a simple Kafka serializer/deserializer wrapper class 
> that calls the coder.
> I appreciate that allowing to use Beam coders is good and consistent with the 
> rest of the system. However, is there a reason to completely disallow to use 
> custom Kafka serializers instead?
> This is a limitation when working with an Avro schema registry for instance, 
> which requires custom serializers. One can write a `Coder` that wraps a 
> custom Kafka serializer, but that means two levels of un-necessary wrapping.
> In addition, the `Coder` abstraction is not equivalent to Kafka's 
> `Serializer` which gets the topic name as input. Using a `Coder` wrapper 
> would require duplicating the output topic setting in the argument to 
> `KafkaIO` and when building the wrapper, which is not elegant and error prone.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)