[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15891895#comment-15891895
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3314
  
Thanks a lot for your understanding @haohui. 
Let us know once you've updated the PR so that we can review and merge it.


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15891810#comment-15891810
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user haohui commented on the issue:

https://github.com/apache/flink/pull/3314
  
Thanks for the comments. Allowing `DeserializationSchema` to return `null` 
sounds good to me. I'll update the PR accordingly.


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15889101#comment-15889101
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user jgrier commented on the issue:

https://github.com/apache/flink/pull/3314
  
I think it would be just fine if we allowed a null return given the 
tradeoffs discussed here.  The main thing was to allow users a way to deal with 
bad data with minimal effort and without throwing an exception and causing 
their job to restart.


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15887668#comment-15887668
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3314
  
@StephanEwen and I just had an offline discussion about the change, and we 
came up with the following thoughts:

Using an `ArrayList` for buffering elements is an "anti-pattern" in Flink, 
because it is not a robust solution. Users could theoretically run into the 
size limit of an array list, and unnesting large messages (in multiple threads 
in the Kafka 0.8 case) can put pressure on the GC. We think that we should try 
to avoid that approach if possible.

Alternative approaches we considered (ordered by preference):
- Define the DeserializationSchema so that users can return `null` if the 
user doesn't want to emit a record.
This approach would not change the current approach, and is pretty minimal. 
Of course, it would not allow for the "unnesting" use case, where you want to 
emit multiple records from one Kafka message. Users would need to deserialize 
into a nested structure and use a flatMap afterwards to do the un-nesting.
- Move the deserialization into the checkpoint lock. This would allow us to 
collect elements into our internal collector from the user collector while 
still preserving exactly once semantics.
This change would probably be a bit more involved code-wise, as we need to 
rearrange some parts (maybe moving the deserialization schema instance into the 
emitRecord() method, change of some method signatures).
A downside of this approach would be that the Kafka 0.8 consumer threads 
would deserialize records in a sequential order (since only one consumer thread 
can hold the lock at a time). For Kafka 0.9 this is already the case. I think 
we can live with that, because the majority of users moved away from kafka 0.8 
by now.
- Use the `ArrayList` approach. Users would potentially run into issues and 
we would loose some of Flink's robustness.

@jgrier since you've opened the original JIRA back then, what's your take 
on the discussion? How bad would it be for users to just allow the `null` or 
record approach? (Other opinions are of course also appreciated)



> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15887647#comment-15887647
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r103405663
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * RichKeyedDeserializationSchema describes how to turn byte key / value 
messages into zero or more messages into data types.
+ * {@see KeyedSerializationSchema}
+ *
+ * @param  The type created by the keyed deserialization schema.
+ */
+public interface RichKeyedDeserializationSchema extends Serializable, 
ResultTypeQueryable {
+   /**
+* Deserializes the byte message.
+*
+* @param messageKey the key as a byte array (null if no key has been 
set)
+* @param message The message, as a byte array. (null if the message 
was empty or deleted)
+* @param partition The partition the message has originated from
+* @param offset the offset of the message in the original source (for 
example the Kafka offset)
+* @param collector the user-provided collector that deserializes the 
bytes into zero or more
+*  records.
+*
+* @return The deserialized message as an object.
--- End diff --

The method doesn't return anything.


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15887548#comment-15887548
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r103399810
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
--- End diff --

I would put it perhaps in `org.apache.flink.streaming.kafka.serialization` 
under `flink-connector-kafka-base`.


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15886750#comment-15886750
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r103339489
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
--- End diff --

Can you please suggest where it should be put?


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15886445#comment-15886445
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user haohui commented on the issue:

https://github.com/apache/flink/pull/3314
  
@rmetzger ping...
just wondering what do you think about all the approaches we have discussed 
here? Your comments are appreciated.


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882276#comment-15882276
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102902685
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
 ---
@@ -119,7 +119,7 @@ public Void answer(InvocationOnMock invocation) {
 @SuppressWarnings("unchecked")
 SourceContext sourceContext = mock(SourceContext.class);
 List topics = Collections.singletonList(new 
KafkaTopicPartition("test", 42));
-KeyedDeserializationSchema schema = new 
KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+RichKeyedDeserializationSchemaWrapper schema = new 
RichKeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
--- End diff --

This file will have conflict with the current `master`, because I recently 
pushed a hotfix to `master` to fix the indentation of this file (previously, 
it's incorrectly using spaces to indent instead of tabs). Sorry about this!


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882263#comment-15882263
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102899179
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * RichDeserializationSchema describes how to turn byte key / value 
messages into zero or more messages into data types.
+ * {@see KeyedSerializationSchema}
+ *
+ * @param  The type created by the keyed deserialization schema.
+ */
+public interface RichKeyedDeserializationSchema extends Serializable, 
ResultTypeQueryable {
+   /**
+* Deserializes the byte message.
+*
+* @param messageKey the key as a byte array (null if no key has been 
set)
+* @param message The message, as a byte array. (null if the message 
was empty or deleted)
+* @param partition The partition the message has originated from
+* @param offset the offset of the message in the original source (for 
example the Kafka offset)
+*
+* @return The deserialized message as an object.
+*/
+   void deserialize(byte[] messageKey, byte[] message, String topic, int 
partition, long offset,
+   Collector collector) throws 
IOException;
--- End diff --

The indentation of the parameters here seems a bit off.
Now with the number of parameters to be quite lengthy, it might be a good 
style to have one parameter per line.


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882269#comment-15882269
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102897911
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
 ---
@@ -422,6 +429,99 @@ public void run() {
assertFalse("fetcher threads did not properly finish", 
sourceContext.isStillBlocking());
}
 
+   @Test
+   public void testRichDeserializationSchema() throws Exception {
--- End diff --

I think we should enhance this test to test the behaviour with multiple 
`collect`s per record also.


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882265#comment-15882265
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102898307
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
 ---
@@ -422,6 +429,99 @@ public void run() {
assertFalse("fetcher threads did not properly finish", 
sourceContext.isStillBlocking());
}
 
+   @Test
+   public void testRichDeserializationSchema() throws Exception {
+   final String topic = "test-topic";
+   final int partition = 3;
+   final byte[] payload = new byte[] {1, 2, 3, 4};
+   final byte[] endPayload = 
"end".getBytes(StandardCharsets.UTF_8);
+
+   final List> records = 
Arrays.asList(
+   new ConsumerRecord<>(topic, partition, 15, payload, 
payload),
+   new ConsumerRecord<>(topic, partition, 16, payload, 
payload),
+   new ConsumerRecord<>(topic, partition, 17, payload, 
endPayload));
+
+   final Map>> 
data = new HashMap<>();
+   data.put(new TopicPartition(topic, partition), records);
+
+   final ConsumerRecords consumerRecords = new 
ConsumerRecords<>(data);
+
+   // - the test consumer -
+
+   final KafkaConsumer mockConsumer = 
mock(KafkaConsumer.class);
+   when(mockConsumer.poll(anyLong())).thenAnswer(new 
Answer>() {
+   @Override
+   public ConsumerRecords answer(InvocationOnMock 
invocation) {
+   return consumerRecords;
+   }
+   });
+
+   
whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+   // - build a fetcher -
+
+   ArrayList results = new ArrayList<>();
+   SourceContext sourceContext = new 
CollectingSourceContext<>(results, results);
+   List topics = 
Collections.singletonList(new KafkaTopicPartition(topic, partition));
+   RichKeyedDeserializationSchema schema = new 
RichKeyedDeserializationSchema() {
+   @Override
+   public void deserialize(
+   byte[] messageKey, byte[] message, String 
topic, int partition,
+   long offset, Collector collector) 
throws IOException {
+   if (offset != 16) {
+   collector.collect(new String(message));
+   }
+   }
+
+   @Override
+   public boolean isEndOfStream(String nextElement) {
+   return nextElement.equals("end");
+   }
+
+   @Override
+   public TypeInformation getProducedType() {
+   return BasicTypeInfo.STRING_TYPE_INFO;
+   }
+   };
+
+   final Kafka09Fetcher fetcher = new Kafka09Fetcher<>(
+   sourceContext,
+   topics,
+   null, /* no restored state */
+   null, /* periodic watermark extractor */
+   null, /* punctuated watermark extractor */
+   new TestProcessingTimeService(),
+   10, /* watermark interval */
+   this.getClass().getClassLoader(),
+   false, /* checkpointing */
+   "task_name",
+   new UnregisteredMetricsGroup(),
+   schema,
+   new Properties(),
+   0L,
+   StartupMode.GROUP_OFFSETS,
+   false);
+
+
+   // - run the fetcher -
+
+   final AtomicReference error = new 
AtomicReference<>();
+   final Thread fetcherRunner = new Thread("fetcher runner") {
--- End diff --

We have a nice utility `CheckedThread` that serves for the tested purpose 
here (catching errors and storing its reference).


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter:

[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882259#comment-15882259
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102898788
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * RichDeserializationSchema describes how to turn byte key / value 
messages into zero or more messages into data types.
--- End diff --

The name of the class is `RichKeyedDeserializationSchema `, but in the 
Javadocs it mentions `RichDeserializationSchema `.


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882268#comment-15882268
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102901123
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
 ---
@@ -171,8 +171,9 @@ private static String getResourceFilename(String 
filename) {
private final List partitions;
 
@SuppressWarnings("unchecked")
-   DummyFlinkKafkaConsumer(List partitions) {
-   super(Arrays.asList("dummy-topic"), 
(KeyedDeserializationSchema< T >) mock(KeyedDeserializationSchema.class));
+   DummyFlinkKafkaConsumer(
+   List partitions) {
--- End diff --

If its just one parameter, I don't think we need a new line.


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882264#comment-15882264
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102896783
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
 ---
@@ -176,7 +177,7 @@ public FlinkKafkaConsumer08(List topics, 
DeserializationSchema deseri
 * @param props
 *   The properties that are used to configure both the fetcher 
and the offset handler.
 */
-   public FlinkKafkaConsumer08(List topics, 
KeyedDeserializationSchema deserializer, Properties props) {
+   public FlinkKafkaConsumer08(List topics, 
RichKeyedDeserializationSchema deserializer, Properties props) {
--- End diff --

This will break user-code. We'll need proper usage migration here.

We have a separate JIRA that aims at deprecating the current Kafka Consumer 
constructors: https://issues.apache.org/jira/browse/FLINK-5704. The migration 
to use the new flat-map deserialzer can be included there.

Perhaps for this PR, we should just use your 
`RichKeyedDeserializationSchemaWrapper` as "behaviour bridges" for the original 
deserialization schema to the new one, and don't change the original 
constructor / include new constructors yet, so that we don't overlap and 
complicate things for FLINK-5704. 


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882258#comment-15882258
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102901299
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -1236,10 +1237,11 @@ public Tuple2WithTopicSchema(ExecutionConfig ec) {
}
 
@Override
-   public Tuple3 deserialize(byte[] 
messageKey, byte[] message, String topic, int partition, long offset) throws 
IOException {
+   public void deserialize(byte[] messageKey, byte[] message, 
String topic, int partition, long offset,
+   
Collector> collector) throws IOException {
--- End diff --

Same here: the indentation formatting seems off.


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882266#comment-15882266
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102896891
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
 ---
@@ -121,7 +122,7 @@ public FlinkKafkaConsumer010(List topics, 
DeserializationSchema deser
 * @param props
 *   The properties that are used to configure both the fetcher 
and the offset handler.
 */
-   public FlinkKafkaConsumer010(List topics, 
KeyedDeserializationSchema deserializer, Properties props) {
+   public FlinkKafkaConsumer010(List topics, 
RichKeyedDeserializationSchema deserializer, Properties props) {
--- End diff --

Same as in the comment in `FlinkKafkaConsumer08`: this breaks user code.


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882262#comment-15882262
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102900986
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
@@ -142,25 +141,38 @@ public void runFetchLoop() throws Exception {
final ConsumerRecords records = 
handover.pollNext();
 
// get the records for each topic partition
-   for (KafkaTopicPartitionState 
partition : subscribedPartitions()) {
+   for (final 
KafkaTopicPartitionState partition : subscribedPartitions()) {
 
List> 
partitionRecords =

records.records(partition.getKafkaPartitionHandle());
 
-   for (ConsumerRecord 
record : partitionRecords) {
-   final T value = 
deserializer.deserialize(
-   record.key(), 
record.value(),
-   record.topic(), 
record.partition(), record.offset());
-
-   if 
(deserializer.isEndOfStream(value)) {
-   // end of stream 
signaled
-   running = false;
-   break;
-   }
-
-   // emit the actual record. this 
also updates offset state atomically
-   // and deals with timestamps 
and watermark generation
-   emitRecord(value, partition, 
record.offset(), record);
+   for (final ConsumerRecord record : partitionRecords) {
+   final Collector collector = 
new Collector() {
+   @Override
+   public void collect(T 
value) {
+   if 
(deserializer.isEndOfStream(value)) {
+   // end 
of stream signaled
+   running 
= false;
+   } else {
+   // emit 
the actual record. this also updates offset state atomically
+   // and 
deals with timestamps and watermark generation
+   try {
+   
emitRecord(value, partition, record.offset(), record);
+   } catch 
(Exception e) {
+   
throw new RuntimeException(e);
+   }
+   }
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   deserializer.deserialize(
+   record.key(), 
record.value(),
+   record.topic(), 
record.partition(), record.offset(), collector);
--- End diff --

The formatting for the list of arguments here could be nicer. Perhaps one 
argument per line?


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There ar

[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882270#comment-15882270
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102900820
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
 ---
@@ -176,7 +177,7 @@ public FlinkKafkaConsumer08(List topics, 
DeserializationSchema deseri
 * @param props
 *   The properties that are used to configure both the fetcher 
and the offset handler.
 */
-   public FlinkKafkaConsumer08(List topics, 
KeyedDeserializationSchema deserializer, Properties props) {
+   public FlinkKafkaConsumer08(List topics, 
RichKeyedDeserializationSchema deserializer, Properties props) {
super(topics, deserializer);
--- End diff --

So, instead of changing the constructor, we should still do
`super(topics, new RickKeyedDeserializationSchemaWrapper(deserializer))`
here.


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882267#comment-15882267
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102900064
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
--- End diff --

Since this is now a very Kafka-specific class, I think this is good timing 
to change to the package path `org.apache.flink.streaming.kafka.serialization` 
now.

The original `KeyedDeserializationSchema` was placed under 
`o.a.f.s.util.serialization` because it was wrongly packaged in another module 
before, and moved to `flink-connector-kafka-base` under the same package path 
to avoid breaking user code.


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882261#comment-15882261
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102898901
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * RichDeserializationSchema describes how to turn byte key / value 
messages into zero or more messages into data types.
+ * {@see KeyedSerializationSchema}
--- End diff --

I'm not sure why we need to link to `KeyedSerializationSchema` in the 
Javadocs for the new serialization schema.
From what I know, we're going to completely replace it, correct?


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882260#comment-15882260
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102900238
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchemaWrapper.java
 ---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+
+public class RichKeyedDeserializationSchemaWrapper implements 
RichKeyedDeserializationSchema {
--- End diff --

Can you also include Javadocs for this class?


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15881992#comment-15881992
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102881632
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -373,16 +370,28 @@ else if (partitionsRemoved) {

keyPayload.get(keyBytes);
}
 
-   final T value = 
deserializer.deserialize(keyBytes, valueBytes, 
-   
currentPartition.getTopic(), currentPartition.getPartition(), offset);
-   
-   if 
(deserializer.isEndOfStream(value)) {
-   // remove 
partition from subscribed partitions.
-   
partitionsIterator.remove();
-   continue 
partitionsLoop;
-   }
-   
-   owner.emitRecord(value, 
currentPartition, offset);
+   final Collector 
collector = new Collector() {
--- End diff --

@haohui, if you don't mind, I would also wait for @rmetzger to take another 
look at the new proposals here, before you jump back again into the code.
This part is quite critical for Flink Kafka's exacty-once guarantee, so 
another pair of eyes on this will be safer.

I would also like to do a thorough pass on your code and see if there are 
other problems, so you work on those all-together.

Is that ok for you? Sorry for some more waiting.


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15881989#comment-15881989
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102881264
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -373,16 +370,28 @@ else if (partitionsRemoved) {

keyPayload.get(keyBytes);
}
 
-   final T value = 
deserializer.deserialize(keyBytes, valueBytes, 
-   
currentPartition.getTopic(), currentPartition.getPartition(), offset);
-   
-   if 
(deserializer.isEndOfStream(value)) {
-   // remove 
partition from subscribed partitions.
-   
partitionsIterator.remove();
-   continue 
partitionsLoop;
-   }
-   
-   owner.emitRecord(value, 
currentPartition, offset);
+   final Collector 
collector = new Collector() {
--- End diff --

I see what you are saying. The trade off here is handing offs the objects 
another time, but I think it's okay. I'll update the PR accordingly.


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15881986#comment-15881986
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102881092
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -373,16 +370,28 @@ else if (partitionsRemoved) {

keyPayload.get(keyBytes);
}
 
-   final T value = 
deserializer.deserialize(keyBytes, valueBytes, 
-   
currentPartition.getTopic(), currentPartition.getPartition(), offset);
-   
-   if 
(deserializer.isEndOfStream(value)) {
-   // remove 
partition from subscribed partitions.
-   
partitionsIterator.remove();
-   continue 
partitionsLoop;
-   }
-   
-   owner.emitRecord(value, 
currentPartition, offset);
+   final Collector 
collector = new Collector() {
--- End diff --

@haohui hmm this seems a bit odd to me. I think it should be achievable.

```
// the buffer; this can be shared
final List bufferedElements = new LinkedList<>();
// BufferCollector is an implementation of Collector that adds collected 
elements to bufferedElements; this can be shared
final BufferCollector collector = new BufferCollector(bufferedElements);

...

for (final ConsumerRecord record : partitionRecords) {
deserializer.deserialize(
record.key(), record.value(), record.topic(),
record.partition(), record.offset(), collector);

emitRecords(bufferedElements, partitionState, record.offset(), record);

bufferedElements.clear(); // after the elements for the record have 
been emitted, empty out the buffer
}
```

Doesn't this work? I haven't really tried this hands-on, so I might be 
overlooking something. Let me know what you think :)


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15881376#comment-15881376
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102830609
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -373,16 +370,28 @@ else if (partitionsRemoved) {

keyPayload.get(keyBytes);
}
 
-   final T value = 
deserializer.deserialize(keyBytes, valueBytes, 
-   
currentPartition.getTopic(), currentPartition.getPartition(), offset);
-   
-   if 
(deserializer.isEndOfStream(value)) {
-   // remove 
partition from subscribed partitions.
-   
partitionsIterator.remove();
-   continue 
partitionsLoop;
-   }
-   
-   owner.emitRecord(value, 
currentPartition, offset);
+   final Collector 
collector = new Collector() {
--- End diff --

Good catch, @tzulitai !

I tried the buffer approach and had no luck. The problem is that calling 
`emitRecord`needs to pass in both the offset and the record itself -- The 
record is used to extract the timestamp in the Kafka 0.10 consumers. The buffer 
itself needs to buffer the deserialized value and the record itself -- it 
cannot solve the problem of having a collector per record.


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880154#comment-15880154
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102668004
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -373,16 +370,28 @@ else if (partitionsRemoved) {

keyPayload.get(keyBytes);
}
 
-   final T value = 
deserializer.deserialize(keyBytes, valueBytes, 
-   
currentPartition.getTopic(), currentPartition.getPartition(), offset);
-   
-   if 
(deserializer.isEndOfStream(value)) {
-   // remove 
partition from subscribed partitions.
-   
partitionsIterator.remove();
-   continue 
partitionsLoop;
-   }
-   
-   owner.emitRecord(value, 
currentPartition, offset);
+   final Collector 
collector = new Collector() {
--- End diff --

What I think we should do to solve this correctly:

Buffer the elements collected from the `deserialize` call. The 
`Collector.collect` implementation should simply add the collected element to 
the buffer, and not emit it immediately.

After `deserialize` returns, call `emitRecord` once with all the elements 
in the buffer and the original record's offset. This, of course, would mean we 
need to slightly change the `emitRecord` implementation a bit to something like:
```
void emitRecord(List records, KafkaTopicPartitionState 
partitionState, long offset) {
synchronized (checkpointLock) {
for (T record : records) {
sourceContext.collect(record);
}
partitionState.setOffset(offset);
}
}
```

After this, we proceed with the next record and repeat. Note that the 
emitting of all produced elements from record at offset 100L and the update to 
the offset state to 100L happens atomically synchronized on the checkpoint 
lock,  so we can make sure that a checkpoint barrier will only come either 
after or before all the produced records of offset 100, and not in-between.

I think we should also be able to avoid a per-record `Collector` with this 
solution. We can reuse a `Collector` and provide it to the `deserializer` for 
every record, because it's simply only a means to collect elements to the 
internal buffer and we're not calling `emitRecords` in it.


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rathe

[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880142#comment-15880142
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102665687
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -373,16 +370,28 @@ else if (partitionsRemoved) {

keyPayload.get(keyBytes);
}
 
-   final T value = 
deserializer.deserialize(keyBytes, valueBytes, 
-   
currentPartition.getTopic(), currentPartition.getPartition(), offset);
-   
-   if 
(deserializer.isEndOfStream(value)) {
-   // remove 
partition from subscribed partitions.
-   
partitionsIterator.remove();
-   continue 
partitionsLoop;
-   }
-   
-   owner.emitRecord(value, 
currentPartition, offset);
+   final Collector 
collector = new Collector() {
--- End diff --

Moving the discussion back a bit:

I don't think this implementation works correctly with exactly-once and how 
we checkpoint the consumer's partition offset state.

The problem is that, in `emitRecord`, we will be updating the offset state. 
In the changes here, what this means is that we will be considering a record to 
have been fully processed as soon as the collector collects something.

For example, lets say the serializer will call `collect` 3 times for 
elements deserialized from record R before `deserialize` returns. R has offset 
100L. As soon as the first element is collected, the state will be updated to 
`finished processing offset 100L`. If now checkpointing is triggered, and we 
use that checkpoint to restore, we will be skipping the remaining 2 elements 
that were yet to be collected.
Once 





> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15878971#comment-15878971
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102542018
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -373,16 +370,28 @@ else if (partitionsRemoved) {

keyPayload.get(keyBytes);
}
 
-   final T value = 
deserializer.deserialize(keyBytes, valueBytes, 
-   
currentPartition.getTopic(), currentPartition.getPartition(), offset);
-   
-   if 
(deserializer.isEndOfStream(value)) {
-   // remove 
partition from subscribed partitions.
-   
partitionsIterator.remove();
-   continue 
partitionsLoop;
-   }
-   
-   owner.emitRecord(value, 
currentPartition, offset);
+   final Collector 
collector = new Collector() {
--- End diff --

@StephanEwen What is your opinion on solving this problem?



> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15876573#comment-15876573
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102299038
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -373,16 +370,28 @@ else if (partitionsRemoved) {

keyPayload.get(keyBytes);
}
 
-   final T value = 
deserializer.deserialize(keyBytes, valueBytes, 
-   
currentPartition.getTopic(), currentPartition.getPartition(), offset);
-   
-   if 
(deserializer.isEndOfStream(value)) {
-   // remove 
partition from subscribed partitions.
-   
partitionsIterator.remove();
-   continue 
partitionsLoop;
-   }
-   
-   owner.emitRecord(value, 
currentPartition, offset);
+   final Collector 
collector = new Collector() {
--- End diff --

Totally agree. Playing around a little bit and it might require some 
trade-offs here.

The problem is that `emitRecord()` needs the state for each records (e.g., 
topic partition, offset, etc.). The state can be either passed inside a closure 
(like the new instance for the `Collector`) or passed through arguments. I see 
there are three possibilities here:

1. Create a new instance of `Collector` for every record. The JVM may or 
may not be able to optimize it. Trace-based JVM should be able to but I'm not 
sure about classed-based JVM.

2. Expose the internal state in the `collect()` call. The `collect()` call 
takes additional parameters such as offset and partition state. It reduces the 
GC overheads but also hinders changing the implementation.

3. Create a new interface like `Optional deserialize(byte[] messageKey, 
...)` (or
`void deserialize(byte[] messageKey, ..., AtomicReference result)` to 
optimize away the cost of the `Optional` class). It results in a slightly more 
complex APIs but it probably has the best trade-offs between performances and 
API compatibility.

What do you think?



> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15874745#comment-15874745
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102048656
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
@@ -142,25 +141,38 @@ public void runFetchLoop() throws Exception {
final ConsumerRecords records = 
handover.pollNext();
 
// get the records for each topic partition
-   for (KafkaTopicPartitionState 
partition : subscribedPartitions()) {
+   for (final 
KafkaTopicPartitionState partition : subscribedPartitions()) {
 
List> 
partitionRecords =

records.records(partition.getKafkaPartitionHandle());
 
-   for (ConsumerRecord 
record : partitionRecords) {
-   final T value = 
deserializer.deserialize(
-   record.key(), 
record.value(),
-   record.topic(), 
record.partition(), record.offset());
-
-   if 
(deserializer.isEndOfStream(value)) {
-   // end of stream 
signaled
-   running = false;
-   break;
-   }
-
-   // emit the actual record. this 
also updates offset state atomically
-   // and deals with timestamps 
and watermark generation
-   emitRecord(value, partition, 
record.offset(), record);
+   for (final ConsumerRecord record : partitionRecords) {
+   final Collector collector = 
new Collector() {
--- End diff --

Same question as in the Kafka 0.8 impl


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15874748#comment-15874748
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3314
  
Thank you for opening a pull request.
I think the change is missing an update to the documentation. I did a very 
very superficial review of the change :) This needs a more thorough check.


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15874739#comment-15874739
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r102048475
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -373,16 +370,28 @@ else if (partitionsRemoved) {

keyPayload.get(keyBytes);
}
 
-   final T value = 
deserializer.deserialize(keyBytes, valueBytes, 
-   
currentPartition.getTopic(), currentPartition.getPartition(), offset);
-   
-   if 
(deserializer.isEndOfStream(value)) {
-   // remove 
partition from subscribed partitions.
-   
partitionsIterator.remove();
-   continue 
partitionsLoop;
-   }
-   
-   owner.emitRecord(value, 
currentPartition, offset);
+   final Collector 
collector = new Collector() {
--- End diff --

I'm not sure of the performance implications for this. The JVM will create 
a Collector instance for each record read from Kafka.
I wonder if we can re-use one collector instance here.


Also, I wonder if we need to use this `Collector` implementation, with a 
`close()` method we are not using and an exception we are turning into a 
`RuntimeException`. Maybe we should let the collect throw an exception?


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-17 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15873009#comment-15873009
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3679:


Hi [~wheat9], sure! I've noticed your PR, and will schedule some time next week 
to review it ;-)
Thank you for the reminder.

> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-17 Thread Haohui Mai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15872506#comment-15872506
 ] 

Haohui Mai commented on FLINK-3679:
---

[~tzulitai] -- would you mind taking a look?

> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15866919#comment-15866919
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

GitHub user haohui opened a pull request:

https://github.com/apache/flink/pull/3314

[FLINK-3679] DeserializationSchema should handle zero or more outputs

This PR adds a new interface, `RichKeyedDeserializationSchema`, to enable 
the deserializer to produce zero or more outputs. The main use case is that 
skipping corrupted messages in the Kafka stream.

Feedbacks (especially on backward compatibility) are highly appreciated.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/haohui/flink FLINK-3679

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3314.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3314


commit 7728acb3bc00a12a7552706be569710fbfdbd200
Author: Haohui Mai 
Date:   2017-02-14T22:19:29Z

[FLINK-3679] DeserializationSchema should handle zero or more outputs for 
every input.




> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-02 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15849633#comment-15849633
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3679:


Hi [~wheat9]!
Thank you for picking this JIRA up. How are you doing with this work? Since the 
previous discussion didn't really come to a conclusion on the API changes for 
this feature yet, can you briefly describe how you plan to add this?

Please also feel free to call out to us if you want to jump around some ideas 
or bump into any problems for this.

> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2016-09-01 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456204#comment-15456204
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3679:


+1 to fix the issue, the proposed changes seem reasonable and heads towards a 
better API. The Kinesis consumer will need to adapt to this change as well, as 
it also accepts DeserializationSchema.

> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2016-08-29 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15445795#comment-15445795
 ] 

Robert Metzger commented on FLINK-3679:
---

Two users were affected by this recently:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Handle-deserialization-error-td8724.html#a8725
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Handling-Kafka-DeserializationSchema-exceptions-td8700.html

I think we need to fix this issue.

> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2016-04-04 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15224692#comment-15224692
 ] 

Jamie Grier commented on FLINK-3679:


I'm not sure about the locking and operator chaining issues so I would say if 
that's unduly complicated because of this change maybe it's not worth it.  
However, a DeserializationSchema with more flatMap() like semantics would 
certainly the better API given that bad data issues are a reality.  It also 
seems we could provide this without breaking existing code, but certainly it 
would add a bit more complexity to the API (having multiple variants for this).

Anyway, I agree you can work around this issue my making a special "sentinel" 
value and dealing with all of this is in a chained flatMap() operator.  I 
imagine that's exactly the approach that people are already using.



> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Jamie Grier
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2016-04-04 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15224182#comment-15224182
 ] 

Robert Metzger commented on FLINK-3679:
---

I had a quick offline chat about this with [~StephanEwen]. Changing the 
semantics of the DeserializationSchema to use an OutputCollector would be 
possible, but it would break existing code, introduce a new class and make the 
locking / operator chaining of the Kafka consumer code more complicated.
I wonder if the problems you've mentioned can't be solved with a flatMap() 
operator. When the Kafka consumer and the flatMap() are executed with the same 
parallelism, they'll be chained together and then executed in the same thread 
with almost no overhead.
If one Kafka message results in two or more logical messages, that "splitting" 
can be done in the flatMap() as well. For invalid records, this can also be 
reflected in the returned record (with a failure flag (some id set to -1 or a 
bool set to false), or a special field in a JSON record), ...) and then treated 
accordingly in the flatMap() call.

If you want, we can keep the JIRA issue open and see if more users run into 
this. If so, we can reconsider fixing it (I'm not saying I've decided against 
fixing it)

> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Jamie Grier
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)