Re: Writing Tuple2 to a sink

2017-02-23 Thread Tzu-Li (Gordon) Tai
Hi Mohit,

I don’t completely understand your question, but I’m assuming that you know the 
type of records your custom sink will be receiving, but you don’t know how to 
extract values from the records.

Assume that the type of the incoming records will be `Tuple2`. 
When writing your custom sink, you should define that type by:

```
public class YourCustomSink implements SinkFunction> {
    …
    
    public void invoke(Tuple2 next) {
        // use next.f0 / next.f1 to retrieve values from the tuple
    }

    ...
}
```

You can of course also define generic types to replace `String` and `Integer`, 
like so:

```
public class YourCustomSink implements SinkFunction> {
    …
    
    public void invoke(Tuple2 next) {
        F field1 = next.f0;
        S field2 = next.f1;
        ...
    }

    ...
}
```

Just replace the generic types with concrete types when instantiating your 
custom sink, according to your topology.

Let me know if this answers your question!

Cheers,
Gordon

On February 24, 2017 at 10:42:33 AM, 刘彪 (mmyy1...@gmail.com) wrote:

Currently, OutputFormat is used for DataSet, SinkFunction is used for 
DataStream. Maybe I misunderstand your problem. That will be better if you give 
more details.

2017-02-24 5:21 GMT+08:00 Mohit Anchlia :
This works for Kafka but for the other types of sink am I supposed to use some 
type of outputformat?

On Tue, Feb 21, 2017 at 7:13 PM, 刘彪  wrote:
Hi
I think there is a good way in FlinkKafkaProducerBase.java to deal with this 
situation. There is a KeyedSerializationSchema user have to implement.   
KeyedSerializationSchema will be used to serialize data, so that SinkFunction 
just need to understand the type after serialization.
In your case, I think you can add a SerializationSchema interface in 
SinkFunction. And user have to implement the SerializationSchema, maybe named 
Tuple2SerializationSchema. 

2017-02-22 7:17 GMT+08:00 Mohit Anchlia :
What's the best way to retrieve both the values in Tuple2 inside a custom sink 
given that the type is not known inside the sink function?





Re: Writing Tuple2 to a sink

2017-02-23 Thread 刘彪
Currently, OutputFormat is used for DataSet, SinkFunction is used for
DataStream. Maybe I misunderstand your problem. That will be better if you
give more details.

2017-02-24 5:21 GMT+08:00 Mohit Anchlia :

> This works for Kafka but for the other types of sink am I supposed to use
> some type of outputformat?
>
> On Tue, Feb 21, 2017 at 7:13 PM, 刘彪  wrote:
>
>> Hi
>> I think there is a good way in FlinkKafkaProducerBase.java to deal with
>> this situation. There is a KeyedSerializationSchema user have to implement.
>>   KeyedSerializationSchema will be used to serialize data, so that
>> SinkFunction just need to understand the type after serialization.
>> In your case, I think you can add a SerializationSchema interface in
>> SinkFunction. And user have to implement the SerializationSchema, maybe
>> named Tuple2SerializationSchema.
>>
>> 2017-02-22 7:17 GMT+08:00 Mohit Anchlia :
>>
>>> What's the best way to retrieve both the values in Tuple2 inside a
>>> custom sink given that the type is not known inside the sink function?
>>>
>>
>>
>


Re: Writing Tuple2 to a sink

2017-02-23 Thread Mohit Anchlia
This works for Kafka but for the other types of sink am I supposed to use
some type of outputformat?

On Tue, Feb 21, 2017 at 7:13 PM, 刘彪  wrote:

> Hi
> I think there is a good way in FlinkKafkaProducerBase.java to deal with
> this situation. There is a KeyedSerializationSchema user have to implement.
>   KeyedSerializationSchema will be used to serialize data, so that
> SinkFunction just need to understand the type after serialization.
> In your case, I think you can add a SerializationSchema interface in
> SinkFunction. And user have to implement the SerializationSchema, maybe
> named Tuple2SerializationSchema.
>
> 2017-02-22 7:17 GMT+08:00 Mohit Anchlia :
>
>> What's the best way to retrieve both the values in Tuple2 inside a custom
>> sink given that the type is not known inside the sink function?
>>
>
>


Re: Writing Tuple2 to a sink

2017-02-21 Thread 刘彪
Hi
I think there is a good way in FlinkKafkaProducerBase.java to deal with
this situation. There is a KeyedSerializationSchema user have to implement.
  KeyedSerializationSchema will be used to serialize data, so that
SinkFunction just need to understand the type after serialization.
In your case, I think you can add a SerializationSchema interface in
SinkFunction. And user have to implement the SerializationSchema, maybe
named Tuple2SerializationSchema.

2017-02-22 7:17 GMT+08:00 Mohit Anchlia :

> What's the best way to retrieve both the values in Tuple2 inside a custom
> sink given that the type is not known inside the sink function?
>


Writing Tuple2 to a sink

2017-02-21 Thread Mohit Anchlia
What's the best way to retrieve both the values in Tuple2 inside a custom
sink given that the type is not known inside the sink function?