Consuming a Kafka topic with multiple partitions from Flink

2017-08-28 Thread Isuru Suriarachchi
Hi all,

I'm trying to implement a Flink consumer which consumes a Kafka topic with
3 partitions. I've set the parallelism of the execution environment to 3 as
I want to make sure that each Kafka partition is consumed by a separate
parallel task in Flink. My first question is whether it's always guaranteed
to have a one-to-one mapping between Kafka partitions and Flink tasks in
this setup?

So far, I've just setup a single Kafka broker and created a topic with 3
partitions and tried to consume it from my flink application with
parallelism set to 3 (all on same machine). I see 3 parallel processes of
each operation being created on Flink log. However, when I execute the
Flink job, messages from all 3 Kafka partitions are consumed by a single
task (Process (3/3)). Other two parallel tasks are idling. Am I mission
something here? In addition to setting the parallelism, is there any other
configuration that I have to do here?

Here are the details about my setup.

Kafka version: 0.10.2.1
Flink version: 1.3.1
Connector: FlinkKafkaConsumer010

Thanks,
Isuru


Re: Flink Elastic Sink AWS ES

2017-08-28 Thread arpit srivastava
It seems AWS ES setup is hiding the nodes ip.

Then I think you can try @vinay patil's solution.

Thanks,
Arpit



On Tue, Aug 29, 2017 at 3:56 AM, ant burton  wrote:

> Hey Arpit,
>
> _cat/nodes?v=ip,port
>
>
> returns the following which I have not added the x’s they were returned on
> the response
>
> ip port
>
> x.x.x.x 9300
>
> Thanks your for you help
>
> Anthony
>
>
> On 28 Aug 2017, at 10:34, arpit srivastava  wrote:
>
> Hi Ant,
>
> Can you try this.
>
> curl -XGET 'http:///_cat/nodes?v=ip,port'
>
> This should give you ip and port
>
> On Mon, Aug 28, 2017 at 3:42 AM, ant burton  wrote:
>
>> Hi Arpit,
>>
>> The response fromm _nodes doesn’t contain an ip address in my case. Is
>> this something that you experienced?
>>
>> curl -XGET 'http:///_nodes'
>>>
>>>
>> Thanks,
>>
>>
>> On 27 Aug 2017, at 14:32, ant burton  wrote:
>>
>> Thanks! I'll check later this evening.
>>
>> On Sun, 27 Aug 2017 at 07:44, arpit srivastava 
>> wrote:
>>
>>> We also had same setup where ES cluster was behind a proxy server for
>>> which port 80 was used which redirected it to ES cluster 9200 port.
>>>
>>> For using Flink we got the actual ip address of the ES nodes and put
>>> that in ips below.
>>>
>>> transportAddresses.add(new 
>>> InetSocketAddress(InetAddress.getByName("127.0.0.1"), 
>>> 9300))transportAddresses.add(new 
>>> InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300))
>>>
>>> But this worked only because 9300 port was open on ES nodes in our setup
>>> and so accessible from our Flink cluster.​
>>>
>>> Get your node list on your ES Cluster using
>>>
>>> curl -XGET 'http:///_nodes'
>>>
>>>
>>>
>>> ​and then check whether you can telnet on that  on port 9300
>>> from your flink cluster nodes
>>>
>>> $ *telnet  9300*
>>>
>>> If this works then you can use above solution.​
>>>
>>>
>>> On Sun, Aug 27, 2017 at 4:09 AM, ant burton 
>>> wrote:
>>>
 Hi Ted,

 Changing the port from 9300 to 9200 in the example you provides causes
 the error in the my original message

 my apologies for not providing context in the form of code in my
 original message, to confirm I am using the example you provided in my
 application and have it working using port 9300 in a docker environment
 locally.

 Thanks,

 On 26 Aug 2017, at 23:24, Ted Yu  wrote:

 If port 9300 in the following example is replaced by 9200, would that
 work ?

 https://ci.apache.org/projects/flink/flink-docs-release-1.3/
 dev/connectors/elasticsearch.html

 Please use Flink 1.3.1+

 On Sat, Aug 26, 2017 at 3:00 PM, ant burton 
 wrote:

> Hello,
>
> Has anybody been able to use the Flink Elasticsearch connector to sink
> data to AWS ES.
>
> I don’t believe this is possible as AWS ES only allows access to port
> 9200 (via port 80) on the master node of the ES cluster, and not port 9300
> used by the the Flink Elasticsearch connector.
>
> The error message that occurs when attempting to connect to AWS ES via
> port 80 (9200) with the Flink Elasticsearch connector is:
>
> Elasticsearch client is not connected to any Elasticsearch nodes!
>
> Could anybody confirm the above? and if possible provide an
> alternative solution?
>
> Thanks you,




>>>
>>
>
>


Re: Null Pointer Exception on Trying to read a message from Kafka

2017-08-28 Thread Sridhar Chellappa
OK. I got past the problem. Basically, I had to change

public class MyKafkaMessageSerDeSchema implements
DeserializationSchema,
SerializationSchema {

@Override
public MyKafkaMessage deserialize(byte[] message) throws IOException {
MyKafkaMessage MyKafkaMessage = null;
try {
MyKafkaMessage =  MyKafkaMessage.parseFrom(message);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
} finally {
return MyKafkaMessage;
}
}

@Override
public boolean isEndOfStream(MyKafkaMessage nextElement) {
return false;
}

@Override
public TypeInformation getProducedType() {
return TypeExtractor.getForClass(MyKafkaMessage.class);;
-> Add Type Info
}

@Override
public byte[] serialize(MyKafkaMessage element) {
return
element.byteArray();
--> modify serializer
}
}


When I run my program, I get another exception :


java.lang.NullPointerException
at 
shaded.com.google.protobuf.UnmodifiableLazyStringList.size(UnmodifiableLazyStringList.java:68)
at java.util.AbstractList.add(AbstractList.java:108)
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131)
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:236)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)


On Tue, Aug 29, 2017 at 8:43 AM, Ted Yu  wrote:

> The NPE came from this line:
>
> StreamRecord copy = castRecord.copy(serializer.
> copy(castRecord.getValue()));
>
> Either serializer or castRecord was null.
>
> I wonder if this has been fixed in 1.3.2 release.
>
> On Mon, Aug 28, 2017 at 7:24 PM, Sridhar Chellappa 
> wrote:
>
>> Kafka Version is 0.10.0
>>
>> On Tue, Aug 29, 2017 at 6:43 AM, Sridhar Chellappa 
>> wrote:
>>
>>> 1.3.0
>>>
>>> On Mon, Aug 28, 2017 at 10:09 PM, Ted Yu  wrote:
>>>
 Which Flink version are you using (so that line numbers can be matched
 

Re: Null Pointer Exception on Trying to read a message from Kafka

2017-08-28 Thread Ted Yu
The NPE came from this line:

StreamRecord copy =
castRecord.copy(serializer.copy(castRecord.getValue()));

Either serializer or castRecord was null.

I wonder if this has been fixed in 1.3.2 release.

On Mon, Aug 28, 2017 at 7:24 PM, Sridhar Chellappa 
wrote:

> Kafka Version is 0.10.0
>
> On Tue, Aug 29, 2017 at 6:43 AM, Sridhar Chellappa 
> wrote:
>
>> 1.3.0
>>
>> On Mon, Aug 28, 2017 at 10:09 PM, Ted Yu  wrote:
>>
>>> Which Flink version are you using (so that line numbers can be matched
>>> with source code) ?
>>>
>>> On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa >> > wrote:
>>>
 DataStream MyKafkaMessageDataStream = env.addSource(
 getStreamSource(env, parameterTool);
 );



 public RichParallelSourceFunction
 getStreamSource(StreamExecutionEnvironment env, ParameterTool
 parameterTool) {

// MyKAfkaMessage is a ProtoBuf message
 env.getConfig().registerTypeWi
 thKryoSerializer(MyKafkaMessage.class, ProtobufSerializer.class);

 KafkaDataSource flinkCepConsumer =
 new KafkaDataSource(parameterTool,
 new MyKafkaMessageSerDeSchema());

 return flinkCepConsumer;
 }


 public class KafkaDataSource extends FlinkKafkaConsumer010 {

 public KafkaDataSource(ParameterTool parameterTool,
 DeserializationSchema deserializer) {
 super(
 Arrays.asList(parameterTool.ge
 tRequired("topic").split(",")),
 deserializer,
 parameterTool.getProperties()
 );

 }

 }

 public class MyKafkaMessageSerDeSchema implements
 DeserializationSchema, SerializationSchema
 {

 @Override
 public MyKafkaMessage deserialize(byte[] message) throws
 IOException {
 MyKafkaMessage MyKafkaMessage = null;
 try {
 MyKafkaMessage =  MyKafkaMessage.parseFrom(message);
 } catch (InvalidProtocolBufferException e) {
 e.printStackTrace();
 } finally {
 return MyKafkaMessage;
 }
 }

 @Override
 public boolean isEndOfStream(MyKafkaMessage nextElement) {
 return false;
 }

 @Override
 public TypeInformation getProducedType() {
 return null;
 }

 @Override
 public byte[] serialize(MyKafkaMessage element) {
 return new byte[0];
 }
 }

 On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu  wrote:

> Which version of Flink / Kafka are you using ?
>
> Can you show the snippet of code where you create the DataStream ?
>
> Cheers
>
> On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa <
> flinken...@gmail.com> wrote:
>
>> Folks,
>>
>> I have a KafkaConsumer that I am trying to read messages from. When I
>> try to create a DataStream from the KafkConsumer (env.addSource()) I get
>> the following exception :
>>
>> Any idea on how can this happen?
>>
>> java.lang.NullPointerException
>>  at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>>  at 
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
>>  at 
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
>>  at 
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
>>  at 
>> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
>>  at 
>> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
>>  at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
>>  at 
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>>  at 
>> 

Re: Question about watermark and window

2017-08-28 Thread Tony Wei
Hi Alijoscha,

It is very helpful to me to understand the behavior on such scenario. Thank
you very much!!!

Best Regards,
Tony Wei

2017-08-28 20:00 GMT+08:00 Aljoscha Krettek :

> Hi Tony,
>
> I think your analyses are correct. Especially, yes, if you re-read the
> data the (ts=3) data should still be considered late if both consumers read
> with the same speed. If, however, (ts=3) is read before the other consumer
> reads (ts=8) then it should not be considered late, as you said.
>
> Best,
> Aljoscha
>
> > On 24. Aug 2017, at 15:49, Tony Wei  wrote:
> >
> > Hi,
> >
> > Recently, I studied about watermark from Flink documents and blogs.
> >
> > I have some question about this scenario below.
> >
> > Suppose there are five clients sending events with different time to the
> topic on Kafka.
> > Topic has two partitions and five events' timestamp are (ts=1), (ts=2),
> (ts=3), (ts=8), (ts=9).
> > The Flink streaming job uses the following setting:
> > 1. use AscendingTimestampExtractor
> > 2. client time as timestamp
> > 3. use tumbling window with 5 unit window size
> > 4. not allow late event
> >
> > If the client events out of order like this.
> >   Partition A [(ts=1), (ts=8)]
> >   Partition B [(ts=2), (ts=9)]  <= (ts=3) delay
> > Should the window function emit [(ts=1), (ts=2)], keep [(ts=8), (ts=9]
> in state and drop out (ts=3) ?
> >
> > If all events has come, and then replay the job from the beginning, the
> partition state would be
> >   Partition A [(ts=1), (ts=8)]
> >   Partition B [(ts=2), (ts=9), (ts=3)]
> > Suppose two consumers fetch events with same speed, should the result be
> the same as above?
> > If consumer B reads (ts=3) earlier than consumer A reads (ts=8), would
> (ts=3) be placed in the window before watermark becomes to 8 and then emit
> [(ts=1), (ts=2), (ts=3)] as result?
> >
> > I wonder if those questions are all correct. If not, is there any
> mechanisms about watermark and window in Flink that I missed.
> >
> > Thank for your help.
> >
> > Best Regards,
> > Tony Wei
> >
>
>


Re: Null Pointer Exception on Trying to read a message from Kafka

2017-08-28 Thread Sridhar Chellappa
Kafka Version is 0.10.0

On Tue, Aug 29, 2017 at 6:43 AM, Sridhar Chellappa 
wrote:

> 1.3.0
>
> On Mon, Aug 28, 2017 at 10:09 PM, Ted Yu  wrote:
>
>> Which Flink version are you using (so that line numbers can be matched
>> with source code) ?
>>
>> On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa 
>> wrote:
>>
>>> DataStream MyKafkaMessageDataStream = env.addSource(
>>> getStreamSource(env, parameterTool);
>>> );
>>>
>>>
>>>
>>> public RichParallelSourceFunction
>>> getStreamSource(StreamExecutionEnvironment env, ParameterTool
>>> parameterTool) {
>>>
>>>// MyKAfkaMessage is a ProtoBuf message
>>> 
>>> env.getConfig().registerTypeWithKryoSerializer(MyKafkaMessage.class,
>>> ProtobufSerializer.class);
>>>
>>> KafkaDataSource flinkCepConsumer =
>>> new KafkaDataSource(parameterTool,
>>> new MyKafkaMessageSerDeSchema());
>>>
>>> return flinkCepConsumer;
>>> }
>>>
>>>
>>> public class KafkaDataSource extends FlinkKafkaConsumer010 {
>>>
>>> public KafkaDataSource(ParameterTool parameterTool,
>>> DeserializationSchema deserializer) {
>>> super(
>>> Arrays.asList(parameterTool.ge
>>> tRequired("topic").split(",")),
>>> deserializer,
>>> parameterTool.getProperties()
>>> );
>>>
>>> }
>>>
>>> }
>>>
>>> public class MyKafkaMessageSerDeSchema implements
>>> DeserializationSchema, SerializationSchema
>>> {
>>>
>>> @Override
>>> public MyKafkaMessage deserialize(byte[] message) throws IOException
>>> {
>>> MyKafkaMessage MyKafkaMessage = null;
>>> try {
>>> MyKafkaMessage =  MyKafkaMessage.parseFrom(message);
>>> } catch (InvalidProtocolBufferException e) {
>>> e.printStackTrace();
>>> } finally {
>>> return MyKafkaMessage;
>>> }
>>> }
>>>
>>> @Override
>>> public boolean isEndOfStream(MyKafkaMessage nextElement) {
>>> return false;
>>> }
>>>
>>> @Override
>>> public TypeInformation getProducedType() {
>>> return null;
>>> }
>>>
>>> @Override
>>> public byte[] serialize(MyKafkaMessage element) {
>>> return new byte[0];
>>> }
>>> }
>>>
>>> On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu  wrote:
>>>
 Which version of Flink / Kafka are you using ?

 Can you show the snippet of code where you create the DataStream ?

 Cheers

 On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa <
 flinken...@gmail.com> wrote:

> Folks,
>
> I have a KafkaConsumer that I am trying to read messages from. When I
> try to create a DataStream from the KafkConsumer (env.addSource()) I get
> the following exception :
>
> Any idea on how can this happen?
>
> java.lang.NullPointerException
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
>
>

>>>
>>
>


Re: Null Pointer Exception on Trying to read a message from Kafka

2017-08-28 Thread Sridhar Chellappa
1.3.0

On Mon, Aug 28, 2017 at 10:09 PM, Ted Yu  wrote:

> Which Flink version are you using (so that line numbers can be matched
> with source code) ?
>
> On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa 
> wrote:
>
>> DataStream MyKafkaMessageDataStream = env.addSource(
>> getStreamSource(env, parameterTool);
>> );
>>
>>
>>
>> public RichParallelSourceFunction
>> getStreamSource(StreamExecutionEnvironment env, ParameterTool
>> parameterTool) {
>>
>>// MyKAfkaMessage is a ProtoBuf message
>> 
>> env.getConfig().registerTypeWithKryoSerializer(MyKafkaMessage.class,
>> ProtobufSerializer.class);
>>
>> KafkaDataSource flinkCepConsumer =
>> new KafkaDataSource(parameterTool,
>> new MyKafkaMessageSerDeSchema());
>>
>> return flinkCepConsumer;
>> }
>>
>>
>> public class KafkaDataSource extends FlinkKafkaConsumer010 {
>>
>> public KafkaDataSource(ParameterTool parameterTool,
>> DeserializationSchema deserializer) {
>> super(
>> Arrays.asList(parameterTool.ge
>> tRequired("topic").split(",")),
>> deserializer,
>> parameterTool.getProperties()
>> );
>>
>> }
>>
>> }
>>
>> public class MyKafkaMessageSerDeSchema implements
>> DeserializationSchema, SerializationSchema
>> {
>>
>> @Override
>> public MyKafkaMessage deserialize(byte[] message) throws IOException {
>> MyKafkaMessage MyKafkaMessage = null;
>> try {
>> MyKafkaMessage =  MyKafkaMessage.parseFrom(message);
>> } catch (InvalidProtocolBufferException e) {
>> e.printStackTrace();
>> } finally {
>> return MyKafkaMessage;
>> }
>> }
>>
>> @Override
>> public boolean isEndOfStream(MyKafkaMessage nextElement) {
>> return false;
>> }
>>
>> @Override
>> public TypeInformation getProducedType() {
>> return null;
>> }
>>
>> @Override
>> public byte[] serialize(MyKafkaMessage element) {
>> return new byte[0];
>> }
>> }
>>
>> On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu  wrote:
>>
>>> Which version of Flink / Kafka are you using ?
>>>
>>> Can you show the snippet of code where you create the DataStream ?
>>>
>>> Cheers
>>>
>>> On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa >> > wrote:
>>>
 Folks,

 I have a KafkaConsumer that I am trying to read messages from. When I
 try to create a DataStream from the KafkConsumer (env.addSource()) I get
 the following exception :

 Any idea on how can this happen?

 java.lang.NullPointerException
at 
 org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
at 
 org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at 
 org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at 
 org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at 
 org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at 
 org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
at 
 org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
at 
 org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
at 
 org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
at 
 org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
at 
 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
at 
 org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
at 
 org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at 
 org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)


>>>
>>
>


Re: Example build error

2017-08-28 Thread Ted Yu
Looking at:
https://github.com/kl0u/flink-examples/blob/master/src/main/java/com/dataartisans/flinksolo/simple/StreamingWordCount.java

there is no line 56.

Which repo do you get StreamingWordCount from ?

On Mon, Aug 28, 2017 at 3:58 PM, Jakes John 
wrote:

> When I am trying to build and run streaming wordcount example(example in
> the flink github), I am getting the following error
>
> StreamingWordCount.java:[56,59]   incompatible types:
> org.apache.flink.api.java.operators.DataSource cannot
> be converted to org.apache.flink.streaming.api.datastream.DataStream<
> java.lang.String>
>
>
> Did any of the apis change? I am building with latest flink
>
>
>   
>
> UTF-8
>
> 1.3.2
>
> 1.7.7
>
> 1.2.17
>
> 2.10
>
> 
>
>
> Thanks,
>
> Johnu
>


Example build error

2017-08-28 Thread Jakes John
When I am trying to build and run streaming wordcount example(example in
the flink github), I am getting the following error

StreamingWordCount.java:[56,59]   incompatible types:
org.apache.flink.api.java.operators.DataSource cannot be
converted to
org.apache.flink.streaming.api.datastream.DataStream


Did any of the apis change? I am building with latest flink


  

UTF-8

1.3.2

1.7.7

1.2.17

2.10




Thanks,

Johnu


Re: Flink Elastic Sink AWS ES

2017-08-28 Thread ant burton
Hey Arpit,

> _cat/nodes?v=ip,port


returns the following which I have not added the x’s they were returned on the 
response

ipport
x.x.x.x 9300
Thanks your for you help

Anthony


> On 28 Aug 2017, at 10:34, arpit srivastava  wrote:
> 
> Hi Ant,
> 
> Can you try this.
> 
> curl -XGET 'http:///_cat/nodes?v=ip,port'
> 
> This should give you ip and port
> 
> On Mon, Aug 28, 2017 at 3:42 AM, ant burton  > wrote:
> Hi Arpit,
> 
> The response fromm _nodes doesn’t contain an ip address in my case. Is this 
> something that you experienced?
> 
>> curl -XGET 'http:///_nodes'
> 
> Thanks,
> 
> 
>> On 27 Aug 2017, at 14:32, ant burton > > wrote:
>> 
>> Thanks! I'll check later this evening.
>> 
>> On Sun, 27 Aug 2017 at 07:44, arpit srivastava > > wrote:
>> We also had same setup where ES cluster was behind a proxy server for which 
>> port 80 was used which redirected it to ES cluster 9200 port.
>> 
>> For using Flink we got the actual ip address of the ES nodes and put that in 
>> ips below.
>> 
>> transportAddresses.add(new 
>> InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
>> transportAddresses.add(new 
>> InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300))
>> But this worked only because 9300 port was open on ES nodes in our setup and 
>> so accessible from our Flink cluster.​
>> 
>> Get your node list on your ES Cluster using
>> curl -XGET 'http:///_nodes'
>> 
>> ​and then check whether you can telnet on that  on port 9300 
>> from your flink cluster nodes
>> 
>> $ telnet  9300
>> 
>> If this works then you can use above solution.​
>> 
>> 
>> On Sun, Aug 27, 2017 at 4:09 AM, ant burton > > wrote:
>> Hi Ted,
>> 
>> Changing the port from 9300 to 9200 in the example you provides causes the 
>> error in the my original message
>> 
>> my apologies for not providing context in the form of code in my original 
>> message, to confirm I am using the example you provided in my application 
>> and have it working using port 9300 in a docker environment locally. 
>> 
>> Thanks,
>> 
>>> On 26 Aug 2017, at 23:24, Ted Yu >> > wrote:
>>> 
>>> If port 9300 in the following example is replaced by 9200, would that work ?
>>> 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/elasticsearch.html
>>>  
>>> 
>>> 
>>> Please use Flink 1.3.1+
>>> 
>>> On Sat, Aug 26, 2017 at 3:00 PM, ant burton >> > wrote:
>>> Hello,
>>> 
>>> Has anybody been able to use the Flink Elasticsearch connector to sink data 
>>> to AWS ES.
>>> 
>>> I don’t believe this is possible as AWS ES only allows access to port 9200 
>>> (via port 80) on the master node of the ES cluster, and not port 9300 used 
>>> by the the Flink Elasticsearch connector.
>>> 
>>> The error message that occurs when attempting to connect to AWS ES via port 
>>> 80 (9200) with the Flink Elasticsearch connector is:
>>> 
>>> Elasticsearch client is not connected to any Elasticsearch nodes!
>>> 
>>> Could anybody confirm the above? and if possible provide an alternative 
>>> solution?
>>> 
>>> Thanks you,
>>> 
>> 
>> 
> 
> 



Union limit

2017-08-28 Thread boci
Hi guys!

I have one input (from mongo) and I split the incoming data to multiple
datasets (each created dynamically from configuration) and before I write
back the result I want to merge it to one dataset (there is some common
transformation).
so the flow:

DataSet from Mongod =>
Create Mappers dynamically (currently 74) so I have 74 DataSet =>
Custom filter and mapping on each dataset =>
Union dynamically to one (every mapper result is same type) =>
Some another common transformation =>
Count the result

but when I want to union more than 64 dataset I got these exception:

Exception in thread "main" org.apache.flink.optimizer.CompilerException:
Cannot currently handle nodes with more than 64 outputs.
at
org.apache.flink.optimizer.dag.OptimizerNode.addOutgoingConnection(OptimizerNode.java:348)
at
org.apache.flink.optimizer.dag.SingleInputNode.setInput(SingleInputNode.java:202)
at
org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(GraphCreatingVisitor.java:268)
at
org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(GraphCreatingVisitor.java:82)

I try to split the incoming (74) list of dataset to split to 60 + 14
 dataset and create an id mapper and union the result datasets but no
success:

val listOfDataSet: List[DataSet[...]] = 

listOfDataSet
.sliding(60,60)
.map(dsg => dsg.reduce((ds1,ds2) => ds1.union(ds2)),map(new IdMapper()))
//There is an iterator of DataSet
.reduce((dsg1,dsg2) => dsg1.union(dsg2)) // Here I got the exception
.map(finalDataSet => ... some transformation ...)
.count()

There is any solution to solve this?

Thanks
b0c1


Default chaining & uid

2017-08-28 Thread Emily McMahon
Does setting uid affect the default chaining (ie if I have two maps in a
row and set uid on both)?

This

makes me think there's no effect

All operators that are part of a chain should be assigned an ID as
> described in the Matching Operator State
>  State> section above.



Thanks,
Emily


Re: Even out the number of generated windows

2017-08-28 Thread Bowen Li
That's exactly what I found yesterday! Thank you Aljoscha for confirming it!

On Mon, Aug 28, 2017 at 2:57 AM, Aljoscha Krettek 
wrote:

> Hi Bowen,
>
> There is not built-in TTL but you can use a ProcessFunction to set a timer
> that clears state.
>
> ProcessFunction docs: https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/stream/process_function.html
>
> Best,
> Aljoscha
>
> On 27. Aug 2017, at 19:19, Bowen Li  wrote:
>
> Hi Robert,
> Thank you for the suggestion, I'll try that.
>
> On a second thought, I can actually reduce the amount of generated
> output so there aren't that many records being sent to Kinesis.
>
> What I want to do is to use Flink's state to keep track of the last
> computation result of a window by each key. If the latest computation
> result is the same as the last one, my Flink job shouldn't emit a new
> record. However, that requires some expiration functionality so that the
> state won't grow indefinitely, as explained in https://issues.apache.org/
> jira/browse/FLINK-3089. Are there anyway to expire keyed state by time?
>
> Thanks,
> Bowen
>
>
>
> On Sun, Aug 27, 2017 at 5:41 AM, Robert Metzger 
> wrote:
>
>> Hi Bowen,
>>
>> I don't know what kind of relationship your company has to AWS, maybe
>> they are willing to look into the issue from their side.
>>
>> To throttle a stream, I would recommend just doing a map operation that
>> is calling  "Thread.sleep()" every n records.
>>
>> On Sat, Aug 26, 2017 at 4:11 AM, Bowen Li 
>> wrote:
>>
>>> Hi Robert,
>>> We use kinesis sink (FlinkKinesisProducer). The main pain is the Kinesis
>>> Producer Library (KPL) that FlinkKinesisProducer uses.
>>>
>>> KPL is basically a java wrapper with a c++ core. It's slow, unstable,
>>> easy to crash, memory-and-CPU-consuming (it sends traffic via HTTP), and
>>> can't handle high workload like a few million records at a short period of
>>> time. Well, in order to write to Kinesis, there's no other options except
>>> KPL (AWS Kinesis SDK is even slower), so I'm not blaming Flink chose KPL.
>>>
>>> Are there any recommended ways to "artificially throttle down the
>>> stream before the sink"? How to add the throttling into Flink's fluent
>>> API?
>>>
>>> Thanks,
>>> Bowen
>>>
>>>
>>> On Fri, Aug 25, 2017 at 2:31 PM, Robert Metzger 
>>> wrote:
>>>
 Hi Bowen,

 (very nice graphics :) )

 I don't think you can do anything about the windows itself (unless you
 are able to build the windows yourself using the ProcessFunction, playing
 some tricks because you know your data), so I should focus on reducing the
 pain in "burning down your sink".
 Are there any issues with the Sink by the spikes? (What's the
 downstream system?)
 Does it make sense for you to artificially throttle down the stream
 before the sink, so that the records per second get limited to a certain
 rate. Since you are using Event time, the window results will always be
 correct & consistent. From a business perspective, this will of course
 introduce additional latency (= results come in later).


 On Fri, Aug 25, 2017 at 6:23 AM, Bowen Li 
 wrote:

> Hi guys,
>
> I do have a question for how Flink generates windows.
>
> We are using a 1-day sized sliding window with 1-hour slide to count
> some features of items based on event time. We have about 20million items.
> We observed that Flink only emit results on a fixed time in an hour (e.g.
> 1am, 2am, 3am,  or 1:15am, 2:15am, 3:15am with a 15min offset). That's
> means 20million windows/records are generated at the same time every hour,
> which burns down our sink. But nothing is generated in the rest of that
> hour. The pattern is like this:
>
> # generated windows
> |
> |/\  /\
> |   /  \/  \
> |_/__\___/__\_
>  time
>
> Is there any way to even out the number of generated windows/records
> in an hour? Can we have evenly distributed generated load like this?
>
> # generated windows
> |
> |
> | 
> |___
>  time
>
> Thanks,
> Bowen
>
>

>>>
>>
>
>


Re: Issues in recovering state from last crash using custom sink

2017-08-28 Thread vipul singh
Hi Aljoscha,

Yes.
I am running the application till a few checkpoints are complete. I am
stopping the application between two checkpoints, so there will be messages
in the list state, which should be checkpointed when *snapshot* is called.
I am able to see a checkpoint file on S3( I am saving the checkpoints on s3
using rockstatedb). On restarting the application, I add a debug point here
,
to see if there are any messages in checkpointedMessages, but as shown
below, the list is empty.


​
Do you think there might be an error in the way I am trying to retrieve
messages?


def snapshotState(context: FunctionSnapshotContext) {
checkpointedMessages.clear()
bufferredMessages.foreach(checkpointedMessages.add)
pendingFiles synchronized {
if (pendingFiles.nonEmpty) {
// we have a list of pending files
// we move all times to S3( thats the sink in our case)
// and post that we delete these files
}
pendingFiles.clear()
}
}

* def initializeState(context: FunctionInitializationContext) {*
* // Check is files alreay exist in /tmp*
* // this might be the case the program crashed before these files were
uploaded to s3*
* // We need to recover(upload these files to S3 and clear the directory*
* handlePreviousPendingFiles()*
* checkpointedMessages = context.getOperatorStateStore.getListState(new
ListStateDescriptor[Message](STATE_UID, TypeInformation.of(new
TypeHint[Message]() {})))*
* import scala.collection.JavaConversions._*
* for (message <- checkpointedMessages.get) {*
* bufferredMessages.add(message)*
* }*
* }* From my understanding in the above code, upon checkpointing, messages
contained in checkpointedMessages are in the snapshot, and on
*initializeState* being called, it will try to recover these messages from
last checkpoint?
Do you think the error is in the way I am trying to get the last checkpoint
ListBuffer elements?
checkpointedMessages = context.getOperatorStateStore.getListState(new
ListStateDescriptor[Message](STATE_UID, TypeInformation.of(new
TypeHint[Message]() {})))

Please let me know!

Thanks,
Vipul

On Mon, Aug 28, 2017 at 2:52 AM, Aljoscha Krettek 
wrote:

> Hi,
>
> How are you testing the recovery behaviour? Are you taking a savepoint
> ,then shutting down, and then restarting the Job from the savepoint?
>
> Best,
> Aljoscha
>
> On 28. Aug 2017, at 00:28, vipul singh  wrote:
>
> Hi all,
>
> I am working on a flink archiver application. In a gist this application
> tries to reads a bunch of schematized messages from kafka and archives them
> to s3. Due to the nature of the naming of the files, I had to go towards a
> custom sink implementation. As of the current progress the application in
> general is able to archive files to s3 ok.
> I am having some issues during the recovery phase. A sample of the code
> can be found on link
> . My
> issue is on recovery when initializeState is called, it is not able to
> get(recover) the last checkpointed ListState( i.e. checkpointedMessages
> 
>  is
> 0). I think this might be because of the way I am retrieving the
> checkpointed messages. Could someone please point me to what is wrong? or
> direct me to some examples which do a similar thing( Please note Message
> 
>  class
> is our own implementation)
>
> Thanks,
> Vipul
>
>
>


-- 
Thanks,
Vipul


Flink Yarn Session failures

2017-08-28 Thread Chan, Regina
Hi,

Was trying to understand why it takes about 9 minutes between the last try to 
start a container and when it finally gets the sigterm to kill the 
YarnApplicationMasterRunner.

Client:



Calc Engine: 2017-08-28 12:39:23,596 INFO  
org.apache.flink.yarn.YarnClusterClient   - Waiting until 
all TaskManagers have connected

Calc Engine: Waiting until all TaskManagers have connected

Calc Engine: 2017-08-28 12:39:23,600 INFO  
org.apache.flink.yarn.YarnClusterClient   - Starting client 
actor system.

Calc Engine: 2017-08-28 12:39:24,077 INFO  akka.event.slf4j.Slf4jLogger 
 - Slf4jLogger started

Calc Engine: 2017-08-28 12:39:24,366 INFO  Remoting 
 - Remoting started; listening on addresses 
:[akka.tcp://fl...@dlp-qa-176378-023.dc.gs.com:39353]

Calc Engine: 2017-08-28 12:39:24,609 INFO  
org.apache.flink.yarn.YarnClusterClient   - TaskManager 
status (0/4)

Calc Engine: TaskManager status (0/4)

Calc Engine: 2017-08-28 12:39:29,864 INFO  
org.apache.flink.yarn.YarnClusterClient   - TaskManager 
status (1/4)

Calc Engine: TaskManager status (1/4)

Calc Engine: 2017-08-28 12:39:30,389 INFO  
org.apache.flink.yarn.YarnClusterClient   - TaskManager 
status (2/4)

Calc Engine: TaskManager status (2/4)

Calc Engine: 2017-08-28 12:41:04,920 INFO  
org.apache.flink.yarn.YarnClusterClient   - TaskManager 
status (1/4)

Calc Engine: TaskManager status (1/4)

Calc Engine: 2017-08-28 12:41:13,775 INFO  
org.apache.flink.yarn.YarnClusterClient   - TaskManager 
status (0/4)

Calc Engine: TaskManager status (0/4)

Calc Engine: 2017-08-28 12:50:43,133 WARN  
akka.remote.ReliableDeliverySupervisor- Association 
with remote system [akka.tcp://fl...@d191303-019.dc.gs.com:58084] has failed, 
address is now gated for [5000] ms. Reason: [Disassociated]



Logs:


Container id: container_e71_1503688027943_30786_01_13

Exit code: 134

Stack trace: ExitCodeException exitCode=134:

at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)

at org.apache.hadoop.util.Shell.run(Shell.java:455)

at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)

at 
org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:293)

at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)

at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)

at java.util.concurrent.FutureTask.run(FutureTask.java:262)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)



Shell output: main : command provided 1

main : user is delp

main : requested yarn user is delp



Container exited with a non-zero exit code 134



17/08/28 12:39:51 INFO yarn.YarnFlinkResourceManager: Total number of failed 
containers so far: 5

17/08/28 12:39:51 ERROR yarn.YarnFlinkResourceManager: Stopping YARN session 
because the number of failed containers (5) exceeded the maximum failed 
containers (4). This number is controlled by the 
'yarn.maximum-failed-containers' configuration setting. By default its the 
number of requested containers.

17/08/28 12:39:51 INFO yarn.YarnFlinkResourceManager: Shutting down cluster 
with status FAILED : Stopping YARN session because the number of failed 
containers (5) exceeded the maximum failed containers (4). This number is 
controlled by the 'yarn.maximum-failed-containers' configuration setting. By 
default its the number of requested containers.

17/08/28 12:39:51 INFO yarn.YarnFlinkResourceManager: Unregistering application 
from the YARN Resource Manager

17/08/28 12:39:51 INFO impl.AMRMClientImpl: Waiting for application to be 
successfully unregistered.

17/08/28 12:39:51 INFO impl.ContainerManagementProtocolProxy: Opening proxy : 
d191303-010.dc.gs.com:45454

17/08/28 12:39:51 INFO impl.AMRMClientAsyncImpl: Interrupted while waiting for 
queue

java.lang.InterruptedException

at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2017)

at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2052)

at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)

at 
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:274)

17/08/28 12:39:51 INFO impl.ContainerManagementProtocolProxy: Opening proxy : 

metrics for Flink sinks

2017-08-28 Thread Martin Eden
Hi all,

Just 3 quick questions both related to Flink metrics, especially around
sinks:

1. In the Flink UI Sources always have 0 input records / bytes and Sinks
always have 0 output records / bytes? Why is it like that?

2. What is the best practice for instrumenting off the shelf Flink sinks?

Currently the only metrics available are num records/bytes in and out at
the operator and task scope. For the task scope there are extra buffer
metrics. However the output metrics are always zero (see question 1). How
can one know the actual number of successful writes done by an off the
shelf Flink sink? Or the latency of the write operation?

3. Is it possible to configure Flink to get global job metrics for all
subtasks of an operator? Or are there any best practices around that?

Thanks,
M


Re: Null Pointer Exception on Trying to read a message from Kafka

2017-08-28 Thread Ted Yu
Which Flink version are you using (so that line numbers can be matched with
source code) ?

On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa 
wrote:

> DataStream MyKafkaMessageDataStream = env.addSource(
> getStreamSource(env, parameterTool);
> );
>
>
>
> public RichParallelSourceFunction 
> getStreamSource(StreamExecutionEnvironment
> env, ParameterTool parameterTool) {
>
>// MyKAfkaMessage is a ProtoBuf message
> 
> env.getConfig().registerTypeWithKryoSerializer(MyKafkaMessage.class,
> ProtobufSerializer.class);
>
> KafkaDataSource flinkCepConsumer =
> new KafkaDataSource(parameterTool,
> new MyKafkaMessageSerDeSchema());
>
> return flinkCepConsumer;
> }
>
>
> public class KafkaDataSource extends FlinkKafkaConsumer010 {
>
> public KafkaDataSource(ParameterTool parameterTool,
> DeserializationSchema deserializer) {
> super(
> Arrays.asList(parameterTool.getRequired("topic").split(","
> )),
> deserializer,
> parameterTool.getProperties()
> );
>
> }
>
> }
>
> public class MyKafkaMessageSerDeSchema implements 
> DeserializationSchema,
> SerializationSchema {
>
> @Override
> public MyKafkaMessage deserialize(byte[] message) throws IOException {
> MyKafkaMessage MyKafkaMessage = null;
> try {
> MyKafkaMessage =  MyKafkaMessage.parseFrom(message);
> } catch (InvalidProtocolBufferException e) {
> e.printStackTrace();
> } finally {
> return MyKafkaMessage;
> }
> }
>
> @Override
> public boolean isEndOfStream(MyKafkaMessage nextElement) {
> return false;
> }
>
> @Override
> public TypeInformation getProducedType() {
> return null;
> }
>
> @Override
> public byte[] serialize(MyKafkaMessage element) {
> return new byte[0];
> }
> }
>
> On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu  wrote:
>
>> Which version of Flink / Kafka are you using ?
>>
>> Can you show the snippet of code where you create the DataStream ?
>>
>> Cheers
>>
>> On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa 
>> wrote:
>>
>>> Folks,
>>>
>>> I have a KafkaConsumer that I am trying to read messages from. When I
>>> try to create a DataStream from the KafkConsumer (env.addSource()) I get
>>> the following exception :
>>>
>>> Any idea on how can this happen?
>>>
>>> java.lang.NullPointerException
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>>> at 
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>>> at 
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>>> at 
>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
>>> at 
>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
>>> at 
>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
>>> at 
>>> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
>>> at 
>>> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
>>> at 
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
>>> at 
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>>> at 
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>
>


Re: Null Pointer Exception on Trying to read a message from Kafka

2017-08-28 Thread Sridhar Chellappa
DataStream MyKafkaMessageDataStream = env.addSource(
getStreamSource(env, parameterTool);
);



public RichParallelSourceFunction
getStreamSource(StreamExecutionEnvironment env, ParameterTool
parameterTool) {

   // MyKAfkaMessage is a ProtoBuf message

env.getConfig().registerTypeWithKryoSerializer(MyKafkaMessage.class,
ProtobufSerializer.class);

KafkaDataSource flinkCepConsumer =
new KafkaDataSource(parameterTool, new
MyKafkaMessageSerDeSchema());

return flinkCepConsumer;
}


public class KafkaDataSource extends FlinkKafkaConsumer010 {

public KafkaDataSource(ParameterTool parameterTool,
DeserializationSchema deserializer) {
super(

Arrays.asList(parameterTool.getRequired("topic").split(",")),
deserializer,
parameterTool.getProperties()
);

}

}

public class MyKafkaMessageSerDeSchema implements
DeserializationSchema, SerializationSchema {

@Override
public MyKafkaMessage deserialize(byte[] message) throws IOException {
MyKafkaMessage MyKafkaMessage = null;
try {
MyKafkaMessage =  MyKafkaMessage.parseFrom(message);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
} finally {
return MyKafkaMessage;
}
}

@Override
public boolean isEndOfStream(MyKafkaMessage nextElement) {
return false;
}

@Override
public TypeInformation getProducedType() {
return null;
}

@Override
public byte[] serialize(MyKafkaMessage element) {
return new byte[0];
}
}

On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu  wrote:

> Which version of Flink / Kafka are you using ?
>
> Can you show the snippet of code where you create the DataStream ?
>
> Cheers
>
> On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa 
> wrote:
>
>> Folks,
>>
>> I have a KafkaConsumer that I am trying to read messages from. When I try
>> to create a DataStream from the KafkConsumer (env.addSource()) I get the
>> following exception :
>>
>> Any idea on how can this happen?
>>
>> java.lang.NullPointerException
>>  at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>>  at 
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
>>  at 
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
>>  at 
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
>>  at 
>> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
>>  at 
>> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
>>  at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
>>  at 
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>>  at 
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>  at java.lang.Thread.run(Thread.java:748)
>>
>>
>


Re: Sink - Cassandra

2017-08-28 Thread nragon
Nick,

Can you send some of your examples using phoenix?

Thanks



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Sink-Cassandra-tp4107p15197.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


CoGroupedStreams.WithWindow sideOutputLateData and allowedLateness

2017-08-28 Thread Yunus Olgun
Hi,

WindowedStream has sideOutputLateData and allowedLateness methods to handle
late data. A similar functionality at CoGroupedStreams would have been
nice. As it is, it silently ignores late data and it is error-prone.

- Is there a reason it does not exist?
- Any suggested workaround?


Off heap memory issue

2017-08-28 Thread Javier Lopez
Hi all,

we are starting a lot of Flink jobs (streaming), and after we have started
200 or more jobs we see that the non-heap memory in the taskmanagers
increases a lot, to the point of killing the instances. We found out that
every time we start a new job, the committed non-heap memory increases by 5
to 10MB. Is this an expected behavior? Are there ways to prevent this?


Re: Null Pointer Exception on Trying to read a message from Kafka

2017-08-28 Thread Ted Yu
Which version of Flink / Kafka are you using ?

Can you show the snippet of code where you create the DataStream ?

Cheers

On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa 
wrote:

> Folks,
>
> I have a KafkaConsumer that I am trying to read messages from. When I try
> to create a DataStream from the KafkConsumer (env.addSource()) I get the
> following exception :
>
> Any idea on how can this happen?
>
> java.lang.NullPointerException
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
>
>


Null Pointer Exception on Trying to read a message from Kafka

2017-08-28 Thread Sridhar Chellappa
Folks,

I have a KafkaConsumer that I am trying to read messages from. When I try
to create a DataStream from the KafkConsumer (env.addSource()) I get the
following exception :

Any idea on how can this happen?

java.lang.NullPointerException
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)


Classloader issue with UDF's in DataStreamSource

2017-08-28 Thread Edward
I need help debugging a problem with using user defined functions in my
DataStreamSource code.

Here's the behavior:
The first time I upload my jar to the Flink cluster and submit the job, it
runs fine.
For any subsequent runs of the same job, it's giving me a NoClassDefFound
error on one of my UDF classes.
If I restart the Flink cluster, this it will again work, but only the first
time I submit the job.

I am using a customized KafkaAvroDeserializer where the reader schema is
different from the writer schema (and where that reader schema in a
generated Avro class in which is included in my uploaded jar file). If I
change my code to use the standard KafkaAvroDeserializer (i.e. no UDF's in
the DataStreamSource), it works fine, even though there are UDF's used in
other steps of my job, so the problem seems specific to DataStreamSource
step.

Why would the classloader here not have access to all classes in my uploaded
jar file, while the classloader used in subsequent steps does have access to
that jar file? Why would it work fine the first time I upload the jar via
the Flink Dashboard, but not on subsequent executions?

Here's the exception I'm seeing:








--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Classloader-issue-with-UDF-s-in-DataStreamSource-tp15192.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Question about windowing

2017-08-28 Thread Aljoscha Krettek
Yes, this is a very good explanation, Tony!

I'd like to add that "Evictor" is not really a good name for what it does. It 
should be more like "Keeper" or "Retainer" because what a 
"CountEvictor.of(1000)" really does is to evict everything but the last 1000 
elements, so it should be called "CountRetainer.of(1000)". 

(The name Trigger and Evictor were originally inspired by IBM Infosphere 
streams, AFAIK.)

Best,
Aljoscha

> On 23. Aug 2017, at 09:56, 魏偉哲  wrote:
> 
> Hi Jerry,
> 
> You can learn about Flink's windowing mechanics in this blog 
> (https://flink.apache.org/news/2015/12/04/Introducing-windows.html 
> ).
> 
> To my understanging, window() defines how Flink use WindowAssigner to insert 
> an element to the right windows, trigger() defines when to fire a  window and 
> evictor() defines what elements in window should be passed to the evaluation 
> function.
> 
> Therefore, it is obvious for time duration windows to use window() to 
> describe an element should be assigned which windows, and trigger by the 
> processing time.
> For the count window, we should actually count the number of elements, so we 
> insert all elements to the single global window, trigger by every 100 
> elements and filter only 1000 elements to the next evaluation function.
> 
> One more thing, in the sliding count window, each element will be placed in 
> the one window, but in the sliding time window element would be duplicated 
> and insert into multiple windows.
> Use you case as an example, element would be placed into five different 
> windows each represent different time range.
> 
> Hope this will help you.
> 
> Regards,
> Tony Wei
> 
> 
> 
> 2017-08-23 8:22 GMT+08:00 Jerry Peng  >:
> Hello,
> 
> I have a question regarding windowing and triggering.  I am trying to
> connect the dots between the simple windowing api e.g.
> 
> stream.countWindow(1000, 100)
> 
> to the underlying representation using triggers and evictors api:
> 
> stream.window(GlobalWindows.create())
>   .evictor(CountEvictor.of(1000))
>   .trigger(CountTrigger.of(100))
> 
> 
> how is the above equivalent to the semantics of a window of window
> length to be 1000 tuples and the sliding interval to be 100 tuples?
> 
> And for time duration windows:
> 
> stream.timeWindow(Time.seconds(5), Time.seconds(1))
> 
> which maps to:
> 
> stream.window(SlidingProcessingTimeWindows.of(Time.seconds(5), 
> Time.seconds(1)))
>   .trigger(ProcessingTimeTrigger.create())
> 
> why isn't it mapped to something like:
> 
> stream.window(SlidingProcessingTimeWindows.create())
>   .trigger(ProccessingTimeTrigger.of(1))
>   .evictor(TimeEvictor.of(5))
> 
> ?
> 
> Thanks for any help in advance!
> 
> Best,
> 
> Jerry
> 



Re: Database connection from job

2017-08-28 Thread Aljoscha Krettek
Hi Bart,

I think you might be interested in the (admittedly short) section of the doc 
about RichFunctions: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html#rich-functions
 


If you make your user function a RichFunction you can implement the lifecycle 
methods open() and close() that allow you to setup, for example, a database 
connection that you wan't to reuse for the lifetime of your user function.

Best,
Aljoscha

> On 24. Aug 2017, at 17:42, Stefan Richter  wrote:
> 
> Hi,
> 
> the lifecycle is described here: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/task_lifecycle.html
>  
> 
> 
> Best,
> Stefan
> 
>> Am 24.08.2017 um 14:12 schrieb Bart Kastermans > >:
>> 
>> I am using the scala api for Flink, and am trying to set up a JDBC
>> database connection
>> in my job (on every incoming event I want to query the database to get
>> some data
>> to enrich the event).  Because of the serialization and deserialization
>> of the code as
>> it is send from the flink master to the flink workers I cannot just open
>> the connection
>> in my main method.  Can someone give me a pointer to the lifecycle
>> methods that
>> are called by the worker to do local initialization of the job?  I have
>> not yet been able
>> to find any references or examples of this in the documentation.
>> 
>> Thanks!
>> 
>> Best,
>> Bart
> 



Re: Question about watermark and window

2017-08-28 Thread Aljoscha Krettek
Hi Tony,

I think your analyses are correct. Especially, yes, if you re-read the data the 
(ts=3) data should still be considered late if both consumers read with the 
same speed. If, however, (ts=3) is read before the other consumer reads (ts=8) 
then it should not be considered late, as you said.

Best,
Aljoscha

> On 24. Aug 2017, at 15:49, Tony Wei  wrote:
> 
> Hi,
> 
> Recently, I studied about watermark from Flink documents and blogs.
> 
> I have some question about this scenario below.
> 
> Suppose there are five clients sending events with different time to the 
> topic on Kafka.
> Topic has two partitions and five events' timestamp are (ts=1), (ts=2), 
> (ts=3), (ts=8), (ts=9).
> The Flink streaming job uses the following setting:
> 1. use AscendingTimestampExtractor
> 2. client time as timestamp
> 3. use tumbling window with 5 unit window size
> 4. not allow late event
> 
> If the client events out of order like this.
>   Partition A [(ts=1), (ts=8)]
>   Partition B [(ts=2), (ts=9)]  <= (ts=3) delay
> Should the window function emit [(ts=1), (ts=2)], keep [(ts=8), (ts=9] in 
> state and drop out (ts=3) ?
> 
> If all events has come, and then replay the job from the beginning, the 
> partition state would be
>   Partition A [(ts=1), (ts=8)]
>   Partition B [(ts=2), (ts=9), (ts=3)]
> Suppose two consumers fetch events with same speed, should the result be the 
> same as above?
> If consumer B reads (ts=3) earlier than consumer A reads (ts=8), would (ts=3) 
> be placed in the window before watermark becomes to 8 and then emit [(ts=1), 
> (ts=2), (ts=3)] as result?
> 
> I wonder if those questions are all correct. If not, is there any mechanisms 
> about watermark and window in Flink that I missed.
> 
> Thank for your help.
> 
> Best Regards,
> Tony Wei
> 



Re: Thoughts - Monitoring & Alerting if a Running Flink job ever kills

2017-08-28 Thread Aljoscha Krettek
Hi,

There is no built-in feature for this but you would use your metrics system for 
that, in my opinion.

Best,
Aljoscha

> On 26. Aug 2017, at 00:49, Raja.Aravapalli  wrote:
> 
> Hi, 
>  
> Is there a way to set alerting when a running Flink job kills, due to any 
> reasons?
>  
> Any thoughts please?
>  
>  
> Regards, 
> Raja.



Re: [Error]TaskManager -RECEIVED SIGNAL 1: SIGHUP. Shutting down as requested

2017-08-28 Thread Ted Yu
See http://docs.oracle.com/cd/E19253-01/816-5166/6mbb1kq04/index.html

Cheers

On Sun, Aug 27, 2017 at 11:47 PM, Samim Ahmed  wrote:

> Hello Ted Yu,
>
> Thanks for your response and a sincere apology for let reply.
>
> OS version : Solaris10.
> Flink Version : flink-1.2.0-bin-hadoop2-scala_2.10
>
>
> Can you please guide me how to get "dmseg" log from the system.
>
> Please let me know if you need any other information and thanks in advance.
>
>
> On Thu, Aug 24, 2017 at 8:46 PM, Ted Yu  wrote:
>
>> Can you provide more information ?
>>
>> OS version
>> Flink version
>>
>> Anything interesting in dmesg output around this time ?
>>
>>
>> On Thu, Aug 24, 2017 at 4:53 AM, Samim Ahmed  wrote:
>>
>>> Hi All,
>>>
>>> From last two days I am getting below error and the worker server are
>>> killed with below errors.
>>>
>>> Please guide me how to fix this error . Earlier Every thing was running
>>> smoothly.
>>> I am using the cluster mode with five slave server and one master
>>> configuration..
>>>
>>>
>>> 2017-08-24 12:24:02,594 INFO  org.apache.flink.runtime.blob.BlobCache
>>> - Created BLOB cache storage directory
>>> /var/tmp/blobStore-1021d695-8bc1-4565-85bf-2552442a910f
>>> 2017-08-24 12:27:26,790 INFO  
>>> org.apache.flink.runtime.taskmanager.TaskManager
>>>  - RECEIVED SIGNAL 1: SIGHUP. Shutting down as requested.
>>> 2017-08-24 12:27:26,793 INFO  org.apache.flink.runtime.blob.BlobCache
>>> - Shutting down BlobCache
>>> 2017-08-24 12:27:26,801 INFO  
>>> org.apache.flink.runtime.io.disk.iomanager.IOManager
>>>  - I/O manager removed spill file directory
>>> /eniq/backup/flink/tmp2/flink-io-1330f609-7400-4464-9318-a06ce049b0e0
>>>
>>> Thanks in advance.
>>>
>>>
>>> --
>>> Regards,
>>> Samim Ahmed
>>> Mumbai
>>> 09004259232 <090042%2059232>
>>>
>>>
>>
>
>
> --
> Regards,
> Samim Ahmed
> Mumbai
> 09004259232
>
>


Re: Specific sink behaviour based on tuple key

2017-08-28 Thread Aljoscha Krettek
Hi,

The Key is not available directly to a user function? You would have to use 
within that function the same code that you use for your KeySelector.

Best,
Aljoscha

> On 26. Aug 2017, at 10:01, Alexis Gendronneau  wrote:
> 
> Hi all,
> 
> I am looking to customize a sink behaviour based on tuple key. 
> 
> My goal is to have each sink partition to write to a specific output. 
> 
> My job will be something like : 
> 
> input -> keyBy() -> outputSink
> 
> This output should be something like './base/path/keyValue/'.
> 
> But I was not able to find where key value is stored, could you help me to go 
> the right way :) 
> 
> -- 
> Alexis Gendronneau
> 
> alexis.gendronn...@corp.ovh.com 
> a.gendronn...@gmail.com 


Re: Even out the number of generated windows

2017-08-28 Thread Aljoscha Krettek
Hi Bowen,

There is not built-in TTL but you can use a ProcessFunction to set a timer that 
clears state.

ProcessFunction docs: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/process_function.html

Best,
Aljoscha

> On 27. Aug 2017, at 19:19, Bowen Li  wrote:
> 
> Hi Robert,
> Thank you for the suggestion, I'll try that.
> 
> On a second thought, I can actually reduce the amount of generated output 
> so there aren't that many records being sent to Kinesis.
> 
> What I want to do is to use Flink's state to keep track of the last 
> computation result of a window by each key. If the latest computation result 
> is the same as the last one, my Flink job shouldn't emit a new record. 
> However, that requires some expiration functionality so that the state won't 
> grow indefinitely, as explained in 
> https://issues.apache.org/jira/browse/FLINK-3089 
> . Are there anyway to 
> expire keyed state by time?
> 
> Thanks,
> Bowen
> 
> 
> 
> On Sun, Aug 27, 2017 at 5:41 AM, Robert Metzger  > wrote:
> Hi Bowen,
> 
> I don't know what kind of relationship your company has to AWS, maybe they 
> are willing to look into the issue from their side.
> 
> To throttle a stream, I would recommend just doing a map operation that is 
> calling  "Thread.sleep()" every n records.
> 
> On Sat, Aug 26, 2017 at 4:11 AM, Bowen Li  > wrote:
> Hi Robert,
> We use kinesis sink (FlinkKinesisProducer). The main pain is the Kinesis 
> Producer Library (KPL) that FlinkKinesisProducer uses.
> 
> KPL is basically a java wrapper with a c++ core. It's slow, unstable, easy to 
> crash, memory-and-CPU-consuming (it sends traffic via HTTP), and can't handle 
> high workload like a few million records at a short period of time. Well, in 
> order to write to Kinesis, there's no other options except KPL (AWS Kinesis 
> SDK is even slower), so I'm not blaming Flink chose KPL.
> 
> Are there any recommended ways to "artificially throttle down the stream 
> before the sink"? How to add the throttling into Flink's fluent API?
> 
> Thanks,
> Bowen
> 
> 
> On Fri, Aug 25, 2017 at 2:31 PM, Robert Metzger  > wrote:
> Hi Bowen,
> 
> (very nice graphics :) )
> 
> I don't think you can do anything about the windows itself (unless you are 
> able to build the windows yourself using the ProcessFunction, playing some 
> tricks because you know your data), so I should focus on reducing the pain in 
> "burning down your sink".
> Are there any issues with the Sink by the spikes? (What's the downstream 
> system?)
> Does it make sense for you to artificially throttle down the stream before 
> the sink, so that the records per second get limited to a certain rate. Since 
> you are using Event time, the window results will always be correct & 
> consistent. From a business perspective, this will of course introduce 
> additional latency (= results come in later).
> 
> 
> On Fri, Aug 25, 2017 at 6:23 AM, Bowen Li  > wrote:
> Hi guys,
> 
> I do have a question for how Flink generates windows. 
> 
> We are using a 1-day sized sliding window with 1-hour slide to count some 
> features of items based on event time. We have about 20million items. We 
> observed that Flink only emit results on a fixed time in an hour (e.g. 1am, 
> 2am, 3am,  or 1:15am, 2:15am, 3:15am with a 15min offset). That's means 
> 20million windows/records are generated at the same time every hour, which 
> burns down our sink. But nothing is generated in the rest of that hour. The 
> pattern is like this:
> 
> # generated windows
> | 
> |/\  /\
> |   /  \/  \
> |_/__\___/__\_
>  time
> 
> Is there any way to even out the number of generated windows/records in an 
> hour? Can we have evenly distributed generated load like this? 
> 
> # generated windows
> | 
> | 
> | 
> |___
>  time 
> 
> Thanks,
> Bowen
> 
> 
> 
> 
> 



Re: Issues in recovering state from last crash using custom sink

2017-08-28 Thread Aljoscha Krettek
Hi,

How are you testing the recovery behaviour? Are you taking a savepoint ,then 
shutting down, and then restarting the Job from the savepoint?

Best,
Aljoscha

> On 28. Aug 2017, at 00:28, vipul singh  wrote:
> 
> Hi all,
> 
> I am working on a flink archiver application. In a gist this application 
> tries to reads a bunch of schematized messages from kafka and archives them 
> to s3. Due to the nature of the naming of the files, I had to go towards a 
> custom sink implementation. As of the current progress the application in 
> general is able to archive files to s3 ok.
> I am having some issues during the recovery phase. A sample of the code can 
> be found on link 
> . My issue 
> is on recovery when initializeState is called, it is not able to get(recover) 
> the last checkpointed ListState( i.e. checkpointedMessages 
> 
>  is 0). I think this might be because of the way I am retrieving the 
> checkpointed messages. Could someone please point me to what is wrong? or 
> direct me to some examples which do a similar thing( Please note Message 
> 
>  class is our own implementation)
> 
> Thanks,
> Vipul



Re: Flink Elastic Sink AWS ES

2017-08-28 Thread arpit srivastava
Hi Ant,

Can you try this.

curl -XGET 'http:///_cat/nodes?v=ip,port'

This should give you ip and port

On Mon, Aug 28, 2017 at 3:42 AM, ant burton  wrote:

> Hi Arpit,
>
> The response fromm _nodes doesn’t contain an ip address in my case. Is
> this something that you experienced?
>
> curl -XGET 'http:///_nodes'
>>
>>
> Thanks,
>
>
> On 27 Aug 2017, at 14:32, ant burton  wrote:
>
> Thanks! I'll check later this evening.
>
> On Sun, 27 Aug 2017 at 07:44, arpit srivastava 
> wrote:
>
>> We also had same setup where ES cluster was behind a proxy server for
>> which port 80 was used which redirected it to ES cluster 9200 port.
>>
>> For using Flink we got the actual ip address of the ES nodes and put that
>> in ips below.
>>
>> transportAddresses.add(new 
>> InetSocketAddress(InetAddress.getByName("127.0.0.1"), 
>> 9300))transportAddresses.add(new 
>> InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300))
>>
>> But this worked only because 9300 port was open on ES nodes in our setup
>> and so accessible from our Flink cluster.​
>>
>> Get your node list on your ES Cluster using
>>
>> curl -XGET 'http:///_nodes'
>>
>>
>>
>> ​and then check whether you can telnet on that  on port 9300
>> from your flink cluster nodes
>>
>> $ *telnet  9300*
>>
>> If this works then you can use above solution.​
>>
>>
>> On Sun, Aug 27, 2017 at 4:09 AM, ant burton  wrote:
>>
>>> Hi Ted,
>>>
>>> Changing the port from 9300 to 9200 in the example you provides causes
>>> the error in the my original message
>>>
>>> my apologies for not providing context in the form of code in my
>>> original message, to confirm I am using the example you provided in my
>>> application and have it working using port 9300 in a docker environment
>>> locally.
>>>
>>> Thanks,
>>>
>>> On 26 Aug 2017, at 23:24, Ted Yu  wrote:
>>>
>>> If port 9300 in the following example is replaced by 9200, would that
>>> work ?
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-
>>> release-1.3/dev/connectors/elasticsearch.html
>>>
>>> Please use Flink 1.3.1+
>>>
>>> On Sat, Aug 26, 2017 at 3:00 PM, ant burton 
>>> wrote:
>>>
 Hello,

 Has anybody been able to use the Flink Elasticsearch connector to sink
 data to AWS ES.

 I don’t believe this is possible as AWS ES only allows access to port
 9200 (via port 80) on the master node of the ES cluster, and not port 9300
 used by the the Flink Elasticsearch connector.

 The error message that occurs when attempting to connect to AWS ES via
 port 80 (9200) with the Flink Elasticsearch connector is:

 Elasticsearch client is not connected to any Elasticsearch nodes!

 Could anybody confirm the above? and if possible provide an alternative
 solution?

 Thanks you,
>>>
>>>
>>>
>>>
>>
>


Re: The implementation of the RichSinkFunction is not serializable.

2017-08-28 Thread Federico D'Ambrosio
Hello everyone,

I solved my issue by using an Array[Byte] as a parameter, instead of the
explicit HTableDescriptor parameter. This way I can instantiate the
TableDescriptor inside the open method of OutputFormat using the static
method HTableDescriptor.parseFrom. In the end, marking conf, table and
connection as transient wouldn't make any difference.

Regards

2017-08-27 14:22 GMT+02:00 Federico D'Ambrosio <
federico.dambro...@smartlab.ws>:

> Hi,
>
> could you elaborate, please? Marking conf, connection and table as
> transient wouldn't help because of the presence of the HTableDescriptor
> reference?
>
> 2017-08-27 12:44 GMT+02:00 Jörn Franke :
>
>> It looks like that in your case everything should be serializable. An
>> alternative would be to mark certain non-serializable things as transient,
>> but as far as I see this is not possible in your case.
>>
>> On 27. Aug 2017, at 11:02, Federico D'Ambrosio <
>> federico.dambro...@smartlab.ws> wrote:
>>
>> Hi,
>>
>> I'm trying to write on HBase using writeOutputFormat using a custom HBase
>> format inspired from this example
>> 
>> in flink-hbase (mind you, I'm using Scala instead of Java) and encountering
>> the error reported in the mail object.
>>
>> Now, the OutputFormat I'm using is the following:
>>
>> abstract class HBaseOutputFormat[T](tableDescriptor: HTableDescriptor, 
>> confPath : Path) extends OutputFormat[T]{
>>
>>   private val LOG = LoggerFactory.getLogger(this.getClass)
>>
>>   var conf : org.apache.hadoop.conf.Configuration = _
>>   var connection : Connection = _
>>   var table : Table = _
>>   var taskNumber : String = _
>>
>>   @throws[IOException]
>>   def configure(parameters: Configuration): Unit = {
>> conf = HBaseConfiguration.create()
>> conf.addResource(confPath.getPath)
>> connection = ConnectionFactory.createConnection(conf)
>>   }
>>
>>
>>   @throws[IOException]
>>   def close(): Unit = {
>> table.close()
>>
>>   }
>>
>>
>>   @throws[IOException]
>>   def open(taskNumber: Int, numTasks: Int): Unit = {
>> this.taskNumber = String.valueOf(taskNumber)
>> val admin = connection.getAdmin
>>
>> if(!admin.tableExists(tableDescriptor.getTableName))
>>   admin.createTable(tableDescriptor)
>>
>> table = connection.getTable(tableDescriptor.getTableName)
>>
>>   }
>> }
>>
>> which is inherited by the actual format used, that implements the 
>> writeRecord method
>>
>>
>> class HBaseBatchFormat(tableDescriptor: HTableDescriptor, confPath : Path)
>>   extends HBaseOutputFormat[BatchContainer](tableDescriptor, confPath)
>>
>> with BatchContainer being
>>
>> case class BatchContainer(batch: Iterable[(String, String, String, Int)]) 
>> extends Serializable
>>
>> I'd like to ask you: what needs to be Serializable? As far as I see,
>> conf, connection and table are not Serializable and so they are surely part
>> of the issue. Are the constructor parameters, especially tableDescriptor
>> which is not Serializable, to be considered in this case? Should all the
>> methods implemented from the OutputFormat interface contain only
>> Serializable variables?
>>
>> Thank you for you attention,
>> Federico
>>
>>
>