Re: kafka consumer client seems not auto commit offset

2017-11-15 Thread Tzu-Li (Gordon) Tai
Hi Tony,

Thanks for the report. At first glance of the description, what you described 
doesn’t seem to match the expected behavior.
I’ll spend some time later today to check this out.

Cheers,
Gordon


On 15 November 2017 at 5:08:34 PM, Tony Wei (tony19920...@gmail.com) wrote:

Hi Gordon,

When I used FlinkKafkaConsumer010 to consume log from Kafka, I found that if I 
used `setStartFromLatest()` the kafka consumer api didn't auto commit offsets 
back to consumer group, but if I used `setStartFromGroupOffsets()` it worked 
fine.

I am sure that the configuration for Kafka has `auto.commit.interval.ms = 5000` 
and `enable.auto.commit = true` and I didn't enable checkpointing.

All the difference is only the change from `setStartFromGroupOffsets()` to 
`setStartFromLatest()`, but the auto commit mechanism just stopped working.

My Flink cluster version is 1.3.2.
My Kafka cluster version is 0.10.2.1.
My Zookeeper version for Kafka is 3.4.6-1569965, built on 02/20/2014 09:09 GMT.
My Kafka connector library is "org.apache.flink" % 
"flink-connector-kafka-0.10_2.10" % "1.3.2"

Thanks for your help in advance.

Best Regards,
Tony Wei

Re: What happened if my parallelism more than kafka partitions.

2017-11-08 Thread Tzu-Li (Gordon) Tai
The `KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks)` method 
returns the index of the target subtask for a given Kafka partition.
The implementation in that method ensures that the same subtask index will 
always be returned for the same partition.

Each consumer subtask will locally invoke this assignment method for each Kafka 
partition.
If the returned subtask index doesn’t equal the subtask’s index, that partition 
will be filtered out and not be read by the subtask.

On 8 November 2017 at 6:38:54 PM, yunfan123 (yunfanfight...@foxmail.com) wrote:

The code of kafka partition assign is like follows:  

public static int assign(KafkaTopicPartition partition, int  
numParallelSubtasks) {  
int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFF) %  
numParallelSubtasks;  

// here, the assumption is that the id of Kafka partitions are always  
ascending  
// starting from 0, and therefore can be used directly as the offset  
clockwise from the start index  
return (startIndex + partition.getPartition()) % numParallelSubtasks;  
}  

It seems it will assign to multi sub tasks.  
I wonder how flink ensure some subtasks will simply remain idle  



--  
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
 


Re: Kafka Consumer fetch-size/rate and Producer queue timeout

2017-11-08 Thread Tzu-Li (Gordon) Tai
Hi Ashish,

From your description I do not yet have much of an idea of what may be 
happening.
However, some of your observations seems reasonable. I’ll go through them one 
by one:

I did try to modify request.timeout.ms, linger.ms etc to help with the issue if 
it were caused by a sudden burst of data or something along those lines. 
However, what it caused the app to increase back pressure and made the slower 
and slower until that timeout is reached.

If the client is experiencing trouble in writing outstanding records to Kafka, 
and the timeout is increased, then I think increased back pressure is indeed 
the expected behavior.

I noticed that consumer fetch-rate drops tremendously while fetch-size grows 
exponentially BEFORE the producer actually start to show higher response-time 
and lower rates.

Drops on fetch-rate and growth on fetch-size in the Flink Kafka consumer should 
be a natural consequence of backpressure in the job.
The fetch loop in the consumer will be blocked temporarily when backpressure is 
propagated from downstream operators, resulting in longer fetch intervals and 
larger batches on each fetch (given that events rate are still constant).
Therefore, I think the root cause is still along the lines of the producer side.

Would you happen to have any logs that maybe shows any useful information on 
the producer side?
I think we might have a better chance of finding out what is going on by 
digging there.
Also, which Flink version & Kafka version are you using?

Cheers,
Gordon
On 5 November 2017 at 11:24:49 PM, Ashish Pokharel (ashish...@yahoo.com) wrote:

All,  

I am starting to notice a strange behavior in a particular streaming app. I 
initially thought it was a Producer issue as I was seeing timeout exceptions 
(records expiring in queue. I did try to modify request.timeout.ms, linger.ms 
etc to help with the issue if it were caused by a sudden burst of data or 
something along those lines. However, what it caused the app to increase back 
pressure and made the slower and slower until that timeout is reached. With 
lower timeouts, app would actually raise exception and recover faster. I can 
tell it is not related to connectivity as other apps are running just fine 
around the same time frame connected to same brokers (we have at least 10 
streaming apps connected to same list of brokers) from the same data nodes. We 
have enabled Graphite Reporter in all of our applications. After deep diving 
into some of consumer and producer stats, I noticed that consumer fetch-rate 
drops tremendously while fetch-size grows exponentially BEFORE the producer 
actually start to show higher response-time and lower rates. Eventually, I 
noticed connection resets start to occur and connection counts go up 
momentarily. After which, things get back to normal. Data producer rates remain 
constant around that timeframe - we have Logstash producer sending data over. 
We checked both Logstash and Kafka metrics and they seem to be showing same 
pattern (sort of sin wave) throughout.  

It seems to point to Kafka issue (perhaps some tuning between Flink App and 
Kafka) but wanted to check with the experts before I start knocking down Kafka 
Admin’s doors. Are there anything else I can look into. There are quite a few 
default stats in Graphite but those were the ones that made most sense.  

Thanks, Ashish

Re: What happened if my parallelism more than kafka partitions.

2017-11-08 Thread Tzu-Li (Gordon) Tai
Hi!

You can set the parallelism of the Flink Kafka Consumer independent of the 
number of partitions.
If there are more consumer subtasks than the number of Kafka partitions to read 
(i.e. when the parallelism of the consumer is set higher than the number of 
partitions), some subtasks will simply remain idle.
Each Kafka partition is deterministically assigned to a single consumer subtask.

Cheers,
Gordon


On 8 November 2017 at 4:21:54 PM, yunfan123 (yunfanfight...@foxmail.com) wrote:

It seems the same partition data will be consume multi times? 



-- 
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 


Re: Flink Streaming example: Kafka010Example.scala doesn't work

2017-10-20 Thread Tzu-Li (Gordon) Tai
Hi,

Thanks for reporting back! That’s good to know.

Gordon


On 20 October 2017 at 3:51:21 PM, Wojtkowski, Michal 
(michal.wojtkowski@roche.com) wrote:

Hi Gordon, 

Thanks for finding time to write back! I managed to solve the issue and it 
turned out to be entirely related to kafka instance.
Have a good day, cheers!

Michał



On Thu, Oct 19, 2017 at 5:22 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> 
wrote:
Hi Michal,

I can’t seem to access the link you provided for the logs.

As for confirming whether or not some data was read / written, how exactly did 
you test that?
In the procedure you laid out, it seems like you only performed some consumer 
group offset checks using the Kafka CLI.
AFAIK, since internally the Kafka consumer does not use the consumer group 
functionality for partition assignment (instead, static assignment is used), I 
think the Kafka CLI offset tools will not work with the offsets committed back 
to Kafka.
On the other hand, the committed offsets are all exposed as the connector’s 
first-class metrics, which can be queried via the web UI / REST API.

Have you tried to see if the output topic has been written data by simply 
reading it (e.g. using the Kafka CLI console consumer)?

Cheers,
Gordon

On 15 October 2017 at 8:56:19 PM, Wojtkowski, Michal 
(michal.wojtkowski@roche.com) wrote:

Hi guys

I'm trying to run official "Kafka010Example.scala", but unortunatelly it 
doesn't read from input topic and write to output as expected. What am I 
missing or doing wrong? Any help or hints much appreciated. Here's exactly what 
I did:

1) Started kafka in docker container (spotify/kafka:latest)

$ docker run -d -p 2181:2181 -p 9092:9092 spotify/kafka:latest

2) Started bash session inside of the container:

$ docker exec -it 26d1cfced4cb /bin/bash

3) Created input and output topics:

$ /opt/kafka_2.11-0.10.1.0/bin/kafka-topics.sh --create --zookeeper 
localhost:2181 --replication-factor 1 --partitions 1 --topic test-input
$ /opt/kafka_2.11-0.10.1.0/bin/kafka-topics.sh --create --zookeeper 
localhost:2181 --replication-factor 1 --partitions 1 --topic test-output

$ /opt/kafka_2.11-0.10.1.0/bin/kafka-topics.sh --zookeeper localhost:2181 --list
test-input
test-output


4) Launched "Kafka010Example.scala" running flink 1.3.2 in local mode 
(MiniCluster), with flink-connector-kafka-0.10_2.11

with the following args:
--input-topic test-input --output-topic test-output --bootstrap.servers 
localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer

5) Logs:

https://file.io/jWsqI8


6) Sent some messages to the topic:

$ /opt/kafka_2.11-0.10.1.0/bin/kafka-console-producer.sh --broker-list 
localhost:9092 --topic test-input
blah
blahh
blahhh


6) Checked offset on the output topic - NOTHING

$ /opt/kafka_2.11-0.10.1.0/bin/kafka-consumer-offset-checker.sh --zookeeper 
localhost:2181 --group myconsumer --topic test-output

Group           Topic                          Pid Offset          logSize      
   Lag             Owner
myconsumer      test-output                    0   0               0            
   0               none


7) Check consumer group offset  - NOTHING

$ /opt/kafka_2.11-0.10.1.0/bin/kafka-consumer-groups.sh --zookeeper 
localhost:2181 --describe --group myconsumer
No topic available for consumer group provided
GROUP                          TOPIC                          PARTITION  
CURRENT-OFFSET  LOG-END-OFFSET  LAG             OWNER


Any clue?

Michal

Confidentiality Note: This message is intended only for the use of the named 
recipient(s) and may contain confidential and/or proprietary information. If 
you are not the intended recipient, please contact the sender and delete this 
message. Any unauthorized use of the information contained in this message is 
prohibited. 

Informacja o poufności: Treść tej wiadomości zawiera informacje przeznaczone 
tylko dla adresata. Jeżeli nie jesteście Państwo jej adresatem, bądź 
otrzymaliście ją przez pomyłkę, prosimy o powiadomienie o tym nadawcy oraz 
trwałe jej usunięcie. Wszelkie nieuprawnione wykorzystanie informacji zawartych 
w tej wiadomości jest zabronione. 



--
Michał Wojtkowski
Senior Specialist Software Development

Roche Polska Sp. z o.o.
ul. Arcb. Baraniaka 88D
61-131 Poznan, Poland
mobile: +48 519 514 924
phone: +48 612 793 030
michal.wojtkowski@roche.com
www.roche.com

Confidentiality Note: This message is intended only for the use of the named 
recipient(s) and may contain confidential and/or proprietary information. If 
you are not the intended recipient, please contact the sender and delete this 
message. Any unauthorized use of the information contained in this message is 
prohibited. 

Informacja o poufności: Treść tej wiadomości zawiera informacje przeznaczone 
tylko dla adresata. Jeżeli nie jesteście Państwo jej adresatem, bądź 
otrzymaliście ją przez pomyłkę, prosimy o powiadomienie o tym nadawcy oraz 
trwałe jej usunięcie. Wszelkie nieuprawnion

Re: Flink CEP State Change Pattern

2017-10-19 Thread Tzu-Li (Gordon) Tai
Hi Philip!

I’m looping in Kostas to this thread. He might be able to provide some insights 
for your question.

Cheers,
Gordon

On 14 October 2017 at 8:54:45 PM, Philip Limbeck (philiplimb...@gmail.com) 
wrote:

Hi!  

I am quite new to Flink CEP and try to define a state change pattern  
with it. This means that only discrete changes in the event stream  
should be detected i.e.  

a a b b - triggers a single change from a to b  

Considering b the "bad" state, I would like to additionally recognize  
the state change from null (i.e. non-existing state) to b:  

b b b - triggers a single change from null to b  

Initially, I tried to model this behavior as follows:  

Pattern<...>begin("first")  
.where()  
.optional()  
.next("second")  
.where()  

However, since state b can be detected without state a, having a state  
change from a to b results in two identified patterns:  
a b  
and  
b  

Additionally, when the "bad" state b is already given, every  
subsequent b event will detect a new b pattern which is also not what  
I want.  

When the "optional" keyword is omitted, obviously no initial b events  
are detected.  

I know that Flink 1.4.0 will support AFTER_MATCH_SKIP, which I assume  
would aid in this situation as a single b event will not take part in  
two computation states.  

Being currently stuck with 1.3.2 is there a workaround using Flink CEP  
to enable this behavior?  

I am aware of the fact that this behavior is much easier to build  
using plain Flink.  

Thank you for your support, any help is appreciated.  

Best  
Philip  


Re: Watermark on connected stream

2017-10-19 Thread Tzu-Li (Gordon) Tai
Ah, sorry for the duplicate answer, I didn’t see Piotr’s reply. Slight delay on 
the mail client.


On 19 October 2017 at 11:05:01 PM, Tzu-Li (Gordon) Tai (tzuli...@apache.org) 
wrote:

Hi Kien,

The watermark of an operator with multiple inputs will be determined by the 
current minimum watermark across all inputs.

Cheers,
Gordon


On 19 October 2017 at 8:06:11 PM, Kien Truong (duckientru...@gmail.com) wrote:

Hi, 

If I connect two stream with different watermark, how are the watermark 
of the resulting stream determined ? 


Best regards, 

Kien 


Re: Accumulator with Elasticsearch Sink

2017-10-19 Thread Tzu-Li (Gordon) Tai
Hi Sendoh,

That sounds like a reasonable metric to add directly to the Elasticsearch 
connector.
Could you perhaps write a comment on that in 
https://issues.apache.org/jira/browse/FLINK-7697?

Cheers,
Gordon

On 19 October 2017 at 8:57:23 PM, Sendoh (unicorn.bana...@gmail.com) wrote:

Hi Flink users,  

Did someone use accumulator with Elasticsearch Sink? So we can better  
compare the last timestamps in the sink and the last timestamps in  
Elasticsearch, in order to see how long does it take from the Elasticsearch  
sink to Elasticsearch.  

Best,  

Sendoh  



--  
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
 


Re: Watermark on connected stream

2017-10-19 Thread Tzu-Li (Gordon) Tai
Hi Kien,

The watermark of an operator with multiple inputs will be determined by the 
current minimum watermark across all inputs.

Cheers,
Gordon


On 19 October 2017 at 8:06:11 PM, Kien Truong (duckientru...@gmail.com) wrote:

Hi, 

If I connect two stream with different watermark, how are the watermark 
of the resulting stream determined ? 


Best regards, 

Kien 


Re: Savepoints and migrating value state data types

2017-10-06 Thread Tzu-Li (Gordon) Tai
Hi,

Yes, the AvroSerializer currently partially still uses Kryo for object copying.
Also, right now, I think the AvroSerializer is only used when the type is 
recognized as a POJO, and that `isForceAvroEnabled` is set on the job 
configuration. I’m not sure if that is always possible.
As mentioned in [1], we would probably need to improve the user experience for 
Avro usage.

For now, if you want to directly use Avro only for serializing your state, 
AFAIK the straightforward approach would be, as you mentioned, to extend a 
custom TypeSerializer that uses the Avro constructs.
Flink’s AvroSerializer actually already sorts of does this, so you can refer to 
that implementation as a base line.

Cheers,
Gordon

[1] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Serialization-performance-td12019.html
On 5 October 2017 at 4:39:10 PM, mrooding (ad...@webresource.nl) wrote:

Gordon  

Thanks for the detailed response. I have verified your assumption and that  
is, unfortunately, the case.  

I also looked into creating a custom Kryo serializer but I got stuck on  
serializing arrays of complex types. It seems like this isn't trivial at all  
with Kryo.  

As an alternative, I've been looking into using Avro only for the Flink  
buffers. Basically, as a first step, we'd still be sending JSON messages  
through Kafka but we would use a custom TypeSerializer that converts the  
case classes to bytes using Avro and vice versa. Unfortunately,  
documentation is really scarce.  

In a different topic,  
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Serialization-performance-td12019.html,
  
it says that Avro is a bit of an odd citizen and that the AvroSerializer  
provided by Flink uses Kryo. This confirms what I've found by going through  
the source code of Flink myself.  

I hope that you can provide me with some pointers. Is extending  
TypeSerializer[T] the best way forward if we only want to use Avro for state  
buffers and thus utilize Avro's schema migration facilities?  

Any pointers would be greatly appreciated!  

Kind regards  

Marc  



--  
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
 


Re: Fwd: Consult about flink on mesos cluster

2017-10-06 Thread Tzu-Li (Gordon) Tai
Hi Bo,

I'm not familiar with Mesos deployments, but I'll forward this to Till or Eron 
(in CC) who perhaps could provide some help here.

Cheers,
Gordon


On 2 October 2017 at 8:49:32 PM, Bo Yu (yubo1...@gmail.com) wrote:

Hello all,
This is Bo, I met some problems when I tried to use flink in my mesos cluster 
(1 master, 2 slaves (cpu has 32 cores)).
I tried to start the mesos-appmaster.sh in marathon, the job manager is started 
without problem.

mesos-appmaster.sh -Djobmanager.heap.mb=1024 -Dtaskmanager.heap.mb=1024 
-Dtaskmanager.numberOfTaskSlots=32

My problem is the task managers are all located in one single slave.
1. (log1)
The initial tasks in "/usr/local/flink/conf/flink-conf.yaml" is setted as 
"mesos.initial-tasks: 2"
And also set the "mesos.constraints.hard.hostattribute: rack:ak09-27", which is 
the master node of mesos cluster.

2. (log2)
I tried many ways to distribute the tasks to all the available slaves, and 
without any success.
So I decide to try add a group_by operator which I referenced from 
https://mesosphere.github.io/marathon/docs/constraints.html
"mesos.constraints.hard.hostattribute: rack:ak09-27,GROUP_BY:2"
According to the log, flink keep waiting for more offers and the tasks never 
been launched.

Sorry, I am a newbie to flink, also on mesos. Please reply if my problem is not 
clear, and I will be appreciate on any hint about how to distribute task evenly 
on available resources.

Thank you in advance.

Best regards,

Bo



Re: ArrayIndexOutOfBoundExceptions while processing valve output watermark and while applying ReduceFunction in reducing state

2017-09-30 Thread Tzu-Li (Gordon) Tai
 Federico D'Ambrosio <federico.dambro...@smartlab.ws>:
Hi Gordon,

I'm currently using Flink 1.3.2 in local mode.

If it's any help I realized from the log that the complete task which is 
failing is:

2017-09-29 14:17:20,354 INFO  org.apache.flink.runtime.taskmanager.Task 
    - latest_time -> (map_active_stream, map_history_stream) (1/1) 
(5a6c9f187326f678701f939665db6685) switched from RUNNING to FAILED.

val events = keyedStreamByID 
  .window(TumblingEventTimeWindows.of(Time.seconds(20)))
  .maxBy("time").name("latest_time").uid("latest_time")


val activeStream = events
  //Serialization to JsValue
  .map(event => 
event.toMongoActiveJsValue).name("map_active_stream").uid("map_active_stream")
  //Global windowing, the cause of exception should be above
  .timeWindowAll(Time.seconds(10))
  .apply(new 
MongoWindow(MongoWritingType.UPDATE)).name("active_stream_window").uid("active_stream_window")

val historyStream = airtrafficEvents
  //Serialization to JsValue
  .map(event => 
event.toMongoHistoryJsValue).name("map_history_stream").uid("map_history_stream")
  //Global windowing, the cause of exception should be above
  .timeWindowAll(Time.seconds(10))
  .apply(new 
MongoWindow(MongoWritingType.UPDATE)).name("history_stream_window").uid("history_stream_window")



Regards,
Federico

2017-09-29 15:38 GMT+02:00 Tzu-Li (Gordon) Tai <tzuli...@apache.org>:
Hi,

I’m looking into this. Could you let us know the Flink version in which the 
exceptions occurred?

Cheers,
Gordon


On 29 September 2017 at 3:11:30 PM, Federico D'Ambrosio 
(federico.dambro...@smartlab.ws) wrote:

Hi, I'm coming across these Exceptions while running a pretty simple flink job.
First one:
java.lang.RuntimeException: Exception occurred while processing valve output 
watermark:   
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException

The second one:
java.io.IOException: Exception while applying ReduceFunction in reducing state
at 
org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:82)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException


Since it looks like something is wrong in Watermark processing, in my case 
Watermarks are generated in my KafkaSource:

val stream = env.addSource(
  new FlinkKafkaConsumer010[Event](topic, new JSONDeserializationSchema(), 
consumerConfig)
.setStartFromLatest()
.assignTimestampsAndWatermarks(
  new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) {
def extractTimestamp(element: AirTrafficEvent): Long =
  element.instantValues.time.getMillis
  })
)
These exceptions aren't really that informative per se and, from what I see, 
the task triggering these exceptions is the following operator:

val events = keyedStreamByID
  .window(TumblingEventTimeWindows.of(Time.seconds(20)))
  .maxBy("timestamp").name("latest_time").uid("latest_time")

What could be the problem here in your opinion? It's not emitting watermarks 
correctly? I'm not even how I could reproduce this exceptions, since it looks 
like they happen pretty much randomly.

Thank you all,
Federico D'Ambrosio



--
Federico D'Ambrosio



--
Federico D'Ambrosio

Re: ArrayIndexOutOfBoundExceptions while processing valve output watermark and while applying ReduceFunction in reducing state

2017-09-29 Thread Tzu-Li (Gordon) Tai
Hi,

I’m looking into this. Could you let us know the Flink version in which the 
exceptions occurred?

Cheers,
Gordon

On 29 September 2017 at 3:11:30 PM, Federico D'Ambrosio 
(federico.dambro...@smartlab.ws) wrote:

Hi, I'm coming across these Exceptions while running a pretty simple flink job.
First one:
java.lang.RuntimeException: Exception occurred while processing valve output 
watermark:  
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException

The second one:
java.io.IOException: Exception while applying ReduceFunction in reducing state
at 
org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:82)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException


Since it looks like something is wrong in Watermark processing, in my case 
Watermarks are generated in my KafkaSource:

val stream = env.addSource(
  new FlinkKafkaConsumer010[Event](topic, new JSONDeserializationSchema(), 
consumerConfig)
.setStartFromLatest()
.assignTimestampsAndWatermarks(
  new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) {
def extractTimestamp(element: AirTrafficEvent): Long =
  element.instantValues.time.getMillis
  })
)
These exceptions aren't really that informative per se and, from what I see, 
the task triggering these exceptions is the following operator:

val events = keyedStreamByID
  .window(TumblingEventTimeWindows.of(Time.seconds(20)))
  .maxBy("timestamp").name("latest_time").uid("latest_time")

What could be the problem here in your opinion? It's not emitting watermarks 
correctly? I'm not even how I could reproduce this exceptions, since it looks 
like they happen pretty much randomly.

Thank you all,
Federico D'Ambrosio

Re: Kinesis connector - Jackson issue

2017-09-26 Thread Tzu-Li (Gordon) Tai
Ah, I see.

Are you using the Flink quickstart template to build your application?
I think exclusion is defined in the pom.xml of that archetype.

Just above the exclusion I do see this message:
“WARNING: You have to remove these excludes if your code relies on other 
version of these dependencies."


On 26 September 2017 at 5:27:47 PM, Tomasz Dobrzycki 
(dobrzycki.tom...@gmail.com) wrote:

Hi Gordon,  

Thanks for your answer.  
- I've built it with Maven 3.2.5  
- I am using Jackson in my application (version 2.7.4)  

Something that I have noticed when building Kinesis connector is that  
it excludes jackson:  
[INFO] Excluding  
com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:jar:2.7.3  
from the shaded jar.  
even though I can't find any mention of that in it's pom.xml.  

Cheers,  
Tomasz  

On 26 September 2017 at 15:43, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: 
 
> Hi Tomasz,  
>  
> Yes, dependency clashes may surface when executing actual job runs on  
> clusters.  
>  
> A few things to probably check first:  
> - Have you built Flink or the Kinesis connector with Maven version 3.3 or  
> above? If yes, try using a lower version, as 3.3+ results in some shading  
> issues when used to build Flink.  
> - I’m not sure if the Kinesis client has a Jackson dependency, but you could  
> also try checking if your application pulls in a conflicting Jackson version  
> (with Flink, which uses 2.7.4) via some other dependency.  
>  
> Cheers,  
> Gordon  
>  
>  
> On 26 September 2017 at 4:28:27 PM, Tomasz Dobrzycki  
> (dobrzycki.tom...@gmail.com) wrote:  
>  
> Hi guys,  
>  
> I'm working with Kinesis connector and currently trying to solve a  
> bizarre issue.  
> I had problems with Kinesis and httpcomponents which I was able to  
> solve using steps shown in:  
> https://github.com/apache/flink/pull/4150/commits/9b539470ac308d7af9df9a70792aa1fa8c6995fc
>   
>  
> That did the trick and I am able to run my code successfully via  
> IntelliJ. I am connecting to Kinesis stream hosted on AWS and reading  
> messages just fine.  
>  
> Unfortunately that is not true for running Flink via command line  
> scripts. I get this error when running start-local.sh into flink run  
> ... :  
> Caused by: java.lang.ClassNotFoundException:  
> com.fasterxml.jackson.dataformat.cbor.CBORFactory  
>  
> I have built my Kinesis connector and installed it via mvn install. Am  
> I missing some steps? I'm assuming that my code is fine given that I'm  
> able to run it through IntelliJ.  
>  
> Anyone faced this problem or maybe some solution comes to your mind?  
>  
> Cheers  
> Tomasz  


Re: Kinesis connector - Jackson issue

2017-09-26 Thread Tzu-Li (Gordon) Tai
Hi Tomasz,

Yes, dependency clashes may surface when executing actual job runs on clusters.

A few things to probably check first:
- Have you built Flink or the Kinesis connector with Maven version 3.3 or 
above? If yes, try using a lower version, as 3.3+ results in some shading 
issues when used to build Flink.
- I’m not sure if the Kinesis client has a Jackson dependency, but you could 
also try checking if your application pulls in a conflicting Jackson version 
(with Flink, which uses 2.7.4) via some other dependency.

Cheers,
Gordon

On 26 September 2017 at 4:28:27 PM, Tomasz Dobrzycki 
(dobrzycki.tom...@gmail.com) wrote:

Hi guys,  

I'm working with Kinesis connector and currently trying to solve a  
bizarre issue.  
I had problems with Kinesis and httpcomponents which I was able to  
solve using steps shown in:  
https://github.com/apache/flink/pull/4150/commits/9b539470ac308d7af9df9a70792aa1fa8c6995fc
  

That did the trick and I am able to run my code successfully via  
IntelliJ. I am connecting to Kinesis stream hosted on AWS and reading  
messages just fine.  

Unfortunately that is not true for running Flink via command line  
scripts. I get this error when running start-local.sh into flink run  
... :  
Caused by: java.lang.ClassNotFoundException:  
com.fasterxml.jackson.dataformat.cbor.CBORFactory  

I have built my Kinesis connector and installed it via mvn install. Am  
I missing some steps? I'm assuming that my code is fine given that I'm  
able to run it through IntelliJ.  

Anyone faced this problem or maybe some solution comes to your mind?  

Cheers  
Tomasz  


Re: Regarding flink-cassandra-connectors

2017-09-26 Thread Tzu-Li (Gordon) Tai
Hi Jagadish,

Yes, that indeed is something missing. If that is something you’re interested 
in, could you perhaps open a JIRA for that (AFAIK there isn’t one for the 
feature yet).

Gordon


On 26 September 2017 at 2:09:37 PM, Jagadish Gangulli (jagadi...@gmail.com) 
wrote:

Thanks Gordon,

Have few more queries on the same lines, if I have to perform fetch i.e. select 
queries, I have to go for the batch queries, no streaming support is available.

Regards,
Jagadisha G

On Tue, Sep 26, 2017 at 3:40 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> 
wrote:
Hi Jagadish,

Yes, you are right that the Flink Cassandra connector uses the Datastax drivers 
internally, which is also the case for all the other Flink connectors; e.g., 
the Kafka connector uses the Kafka Java client, Elasticearch connector uses the 
ES Java client, etc.

The main advantage when using these Flink first-class supported connectors is 
basically the following:
- Most importantly, the connectors work with Flink’s checkpointing mechanism to 
achieve exactly-once or at-least-once guarantees. You can read more about that 
here [1].
- The connectors are built on Flink’s abstractions of streaming sources / 
sinks. What this means is you can basically swap out / plug-in / add sources or 
sinks to various external systems without altering the main business logic in 
your processing pipeline. i.e., also sinking your data to Elasticsearch would 
be as simple as also adding a Elasticsearch sink to your pipeline output 
alongside your Cassandra sink.

Hope this clarifies some points for you!

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/stream_checkpointing.html

On 26 September 2017 at 11:03:16 AM, Jagadish Gangulli (jagadi...@gmail.com) 
wrote:

Hi,

I have been recently into the application development with flink. We are trying 
to use the flink-apache connectors to get the data in and out from Cassandra.

We attempted both Datastax drivers and Flink-cassandra connectors.  In this 
process felt that flink-cassandra connector is more of a wrapper on top of data 
stax cassandra drivers.

Hence could some one please explain the benefits of the 
flink-cassandra-connectors over the data stax driver apis. We are looking for 
the APIs which are better in terms of performance. Please let me know your 
thoughts.

Thanks & Regards,
Jagadisha G




Re: Regarding flink-cassandra-connectors

2017-09-26 Thread Tzu-Li (Gordon) Tai
Ah, sorry I just realized Till also answered your question on your cross-post 
at dev@.
It’s usually fine to post questions to just a single mailing list :)


On 26 September 2017 at 12:10:55 PM, Tzu-Li (Gordon) Tai (tzuli...@apache.org) 
wrote:

Hi Jagadish,

Yes, you are right that the Flink Cassandra connector uses the Datastax drivers 
internally, which is also the case for all the other Flink connectors; e.g., 
the Kafka connector uses the Kafka Java client, Elasticearch connector uses the 
ES Java client, etc.

The main advantage when using these Flink first-class supported connectors is 
basically the following:
- Most importantly, the connectors work with Flink’s checkpointing mechanism to 
achieve exactly-once or at-least-once guarantees. You can read more about that 
here [1].
- The connectors are built on Flink’s abstractions of streaming sources / 
sinks. What this means is you can basically swap out / plug-in / add sources or 
sinks to various external systems without altering the main business logic in 
your processing pipeline. i.e., also sinking your data to Elasticsearch would 
be as simple as also adding a Elasticsearch sink to your pipeline output 
alongside your Cassandra sink.

Hope this clarifies some points for you!

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/stream_checkpointing.html

On 26 September 2017 at 11:03:16 AM, Jagadish Gangulli (jagadi...@gmail.com) 
wrote:

Hi,

I have been recently into the application development with flink. We are trying 
to use the flink-apache connectors to get the data in and out from Cassandra.

We attempted both Datastax drivers and Flink-cassandra connectors.  In this 
process felt that flink-cassandra connector is more of a wrapper on top of data 
stax cassandra drivers.

Hence could some one please explain the benefits of the 
flink-cassandra-connectors over the data stax driver apis. We are looking for 
the APIs which are better in terms of performance. Please let me know your 
thoughts.

Thanks & Regards,
Jagadisha G



Re: Regarding flink-cassandra-connectors

2017-09-26 Thread Tzu-Li (Gordon) Tai
Hi Jagadish,

Yes, you are right that the Flink Cassandra connector uses the Datastax drivers 
internally, which is also the case for all the other Flink connectors; e.g., 
the Kafka connector uses the Kafka Java client, Elasticearch connector uses the 
ES Java client, etc.

The main advantage when using these Flink first-class supported connectors is 
basically the following:
- Most importantly, the connectors work with Flink’s checkpointing mechanism to 
achieve exactly-once or at-least-once guarantees. You can read more about that 
here [1].
- The connectors are built on Flink’s abstractions of streaming sources / 
sinks. What this means is you can basically swap out / plug-in / add sources or 
sinks to various external systems without altering the main business logic in 
your processing pipeline. i.e., also sinking your data to Elasticsearch would 
be as simple as also adding a Elasticsearch sink to your pipeline output 
alongside your Cassandra sink.

Hope this clarifies some points for you!

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/stream_checkpointing.html

On 26 September 2017 at 11:03:16 AM, Jagadish Gangulli (jagadi...@gmail.com) 
wrote:

Hi,

I have been recently into the application development with flink. We are trying 
to use the flink-apache connectors to get the data in and out from Cassandra.

We attempted both Datastax drivers and Flink-cassandra connectors.  In this 
process felt that flink-cassandra connector is more of a wrapper on top of data 
stax cassandra drivers.

Hence could some one please explain the benefits of the 
flink-cassandra-connectors over the data stax driver apis. We are looking for 
the APIs which are better in terms of performance. Please let me know your 
thoughts.

Thanks & Regards,
Jagadisha G



RE: Flink kafka consumer that read from two partitions in local mode

2017-09-26 Thread Tzu-Li (Gordon) Tai
Hi,

Glad you sorted it out :)

AFAIK, the number of created Kafka partitions cannot be specified using the 
Kafka client.
If you want topics to be created with 2 partitions, you’ll have to change the 
default for that in the broker configurations.
Or, simply create the topic with the desired number of partitions.

Cheers,
Gordon

On 26 September 2017 at 9:43:33 AM, Sofer, Tovi (tovi.so...@citi.com) wrote:

Hi,

 

Issue was solved.

After your guidance to producer part, I’ve checked in Kafka and saw that topic 
was created with one partition.

I’ve re- created it with two partitions manually and it fixed the problem.

// update in KAFKA_HOME/config/server.properties : set delete.topic.enable=true
%KAFKA_HOME%\bin\windows\kafka-topics.bat --delete  --topic fix --zookeeper 
localhost:2181
%KAFKA_HOME%\bin\windows\kafka-topics.bat --create --partitions 2 --topic fix 
--zookeeper localhost:2181 --replication-factor 1
%KAFKA_HOME%\bin\windows\kafka-topics.bat ---list --zookeeper localhost:2181

 

A follow-up question – is it possible to create the topic with two partitions 
while creating the FlinkKafKaProducer?

Since by default it seems to create it with one partition.

 

Thanks and regards,

Tovi

 

From: Sofer, Tovi [ICG-IT]  
Sent: יום ב 25 ספטמבר 2017 17:18
To: 'Tzu-Li (Gordon) Tai'; Fabian Hueske
Cc: user
Subject: RE: Flink kafka consumer that read from two partitions in local mode

 

Hi Gordon,

 

Thanks for your assistance.

 

· We are running flink currently  in local mode(MiniCluster), using 
flink 1.3.2 and flink-connector-kafka-0.10_2.10.

 

· In Consumer log I see 1 partition only (when parallelism=1), so the 
problem indeed seems to be in producer.

2017-09-25 17:10:58,140 WARN  org.apache.kafka.clients.consumer.ConsumerConfig 
- [Source: fix_topic -> FixMapper (1/1)] The configuration 'topic.name' was 
supplied but isn't a known config.

2017-09-25 17:10:58,143 INFO  org.apache.kafka.common.utils.AppInfoParser - 
[Source: fix_topic -> FixMapper (1/1)] Kafka version : 0.10.2.1

2017-09-25 17:10:58,144 INFO  org.apache.kafka.common.utils.AppInfoParser - 
[Source: fix_topic -> FixMapper (1/1)] Kafka commitId : e89bffd6b2eff799

2017-09-25 17:10:58,679 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 - [Source: 
fix_topic -> FixMapper (1/1)] Got 1 partitions from these topics: [fix]

2017-09-25 17:10:58,679 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 - [Source: 
fix_topic -> FixMapper (1/1)] Consumer is going to read the following topics 
(with number of partitions): fix (1), 

2017-09-25 17:10:58,680 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - [Source: 
fix_topic -> FixMapper (1/1)] Consumer subtask 0 will start reading the 
following 1 partitions from the committed group offsets in Kafka: 
[KafkaTopicPartition{topic='fix', partition=0}]

 

· The producer seems to write to one partition only.

internalProducer.topicPartitionsMap and Cluster.Partitions seems to have one 
partition for FIX topic.

 

In producer log each producer start with configuration below:

2017-09-25 17:06:49,596 INFO  org.apache.kafka.clients.producer.ProducerConfig- 
[Source: random -> Sink: fixTopicSink (2/2)] ProducerConfig values:

    acks = 1

    batch.size = 16384

    block.on.buffer.full = false

    bootstrap.servers = [localhost:9092]

    buffer.memory = 33554432

    client.id =

    compression.type = none

    connections.max.idle.ms = 54

    interceptor.classes = null

    key.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer

    linger.ms = 0

    max.block.ms = 6

    max.in.flight.requests.per.connection = 5

    max.request.size = 1048576

    metadata.fetch.timeout.ms = 6

    metadata.max.age.ms = 30

    metric.reporters = []

    metrics.num.samples = 2

    metrics.sample.window.ms = 3

    partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner

    receive.buffer.bytes = 32768

    reconnect.backoff.ms = 50

    request.timeout.ms = 3

    retries = 0

    retry.backoff.ms = 100

    sasl.jaas.config = null

    sasl.kerberos.kinit.cmd = /usr/bin/kinit

    sasl.kerberos.min.time.before.relogin = 6

    sasl.kerberos.service.name = null

    sasl.kerberos.ticket.renew.jitter = 0.05

    sasl.kerberos.ticket.renew.window.factor = 0.8

    sasl.mechanism = GSSAPI

    security.protocol = PLAINTEXT

    send.buffer.bytes = 131072

    ssl….

 

RE: Flink kafka consumer that read from two partitions in local mode

2017-09-25 Thread Tzu-Li (Gordon) Tai
Hi Tovi,

Your code seems to be correct, and as Fabian described, you don’t need 
parallelism of 2 to read 2 partitions; a single parallel instance of the source 
can read multiple partitions.

I’m not sure what could have possibly gone wrong at the moment from a first 
look, so I may need to randomly ask you some questions:

Could you let me know which Flink version you are on?
Also, could you try searching in the log to see if you find consumer logs such 
as:
“Consumer subtask ... will start reading the following (numPartitions) 
partitions from ...: (partition info) "
You can try setting parallelism of the source to 1, and you should see that the 
subtask is reading 2 partitions.

From the metrics log it does seem like the consumer has picked up both 
partitions 0 and 1, but no records seem to be coming from partition 0. Have you 
perhaps tried using a non-Flink consumer, perhaps the simple console consumer, 
to read the topic, and see if records from both partitions are consumed 
properly?

Let me know, I’m sure we can figure this out somehow.

Cheers,
Gordon
On 24 September 2017 at 9:44:28 AM, Sofer, Tovi (tovi.so...@citi.com) wrote:

Thank you Fabian.

 

Fabian, Gordon, am I missing something in consumer setup?

Should I configure consumer in some way to subscribe to two partitions?

 

Thanks and regards,

Tovi

 

From: Fabian Hueske [mailto:fhue...@gmail.com]  
Sent: יום ג 19 ספטמבר 2017 22:58
To: Sofer, Tovi [ICG-IT]
Cc: user; Tzu-Li (Gordon) Tai
Subject: Re: Flink kafka consumer that read from two partitions in local mode

 

Hi Tovi,

your code looks OK to me. Maybe Gordon (in CC) has an idea what is going wrong.

Just a side note: you don't need to set the parallelism to 2 to read from two 
partitions. A single consumer instance reads can read from multiple partitions.

Best,

Fabian

 

2017-09-19 17:02 GMT+02:00 Sofer, Tovi <tovi.so...@citi.com>:

Hi,

 

I am trying to setup FlinkKafkaConsumer which reads from two partitions in 
local mode, using  setParallelism=2.

The producer writes to two partition (as it is shown in metrics report).

But the consumer seems to read always from one partition only.

Am I missing something in partition configuration?

 

Code:

 

Producer setup:
Configuration localConfig = new Configuration();
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(parallelism, localConfig);
env.setParallelism(2);
String kafkaPort = 
parameters.get(SimulatorConfig.ParamsEnum.KAFKA_PORT.fullName());
SingleOutputStreamOperator fixMsgSource = 
env.addSource(srcMsgProvider.getFixMsgSource(), 
TypeInformation.of(String.class)).name(sourceGenerationType.getValue());
fixMsgSource.addSink(new FlinkKafkaProducer010<>("localhost:"  + kafkaPort, 
TOPIC_NAME, new SimpleStringSchema()))
.name(“fix_topic”);
env.execute(“MsgSimulatorJob”);
 

 

Consumer setup:


String topicName = “fix”;
Configuration conf = new Configuration();
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(2);
env.getConfig().setGlobalJobParameters(configParams); // make parameters 
available in the web interface
DeserializationSchema<Tuple2<Long, String>> deserializationSchema = new 
SimpleStringAndTimestampDeserializationSchema ();
FlinkKafkaConsumer010<Tuple2<Long, String>> kafkaConsumer = new 
FlinkKafkaConsumer010<>(topicName, deserializationSchema, 
kafkaParams.getProperties());
DataStream<Tuple2<Long, String>> fixMessagesStream = 
env.addSource(kafkaConsumer).name("fix_topic").setParallelism(2);
 

As you can see in output, only 1 consumer partition seems to be used:

Producer output:

2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source: 
random -> Sink: fix_topic.1.numRecordsInPerSecond: 0.0

2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink: 
fix_topic.1.numRecordsInPerSecond: 19836.0333

2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink: 
fix_topic.0.numRecordsInPerSecond: 20337.9334

2017-09-19 14:40:45,819 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source: 
random -> Sink: fix_topic.0.numRecordsInPerSecond: 0.0

Consumer output:

2017-09-19 14:40:45,116 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.KafkaConsumer.select-rate: 1928.1421153709368

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.KafkaConsumer.commit-rate: 0.21623491761449637

2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.

Re: StreamCorruptedException

2017-09-25 Thread Tzu-Li (Gordon) Tai
re
Caused by: java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: 
GC overhead limit exceeded
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
    ... 5 more


So, it looks like the Job Manager ran out of memory, thanks to the 
"Progressively Getting Worse" checkpoints. Any ideas on how to make sure the 
checkpoints faster?

 




On Thu, Sep 21, 2017 at 7:29 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> 
wrote:
Hi Sridhar,

Sorry that this didn't get a response earlier.

According to the trace, it seems like the job failed during the process, and
when trying to automatically restore from a checkpoint, deserialization of a
CEP `IterativeCondition` object failed. As far as I can tell, CEP operators
are just using Java serialization on CEP `IterativeCondition` objects, so
should not be related to the protobuf serializer that you are using.

Is this still constantly happening for you?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: LocalStreamEnvironment - configuration doesn't seem to be used in RichFunction operators

2017-09-23 Thread Tzu-Li (Gordon) Tai
Yes, I agree that the behavior can be quite surprising, and if not stated 
somewhere in the docs already we should update it.

Pass in a Serializable "injector"/proxy object in the constructor
In the "open" (or body of the function) get the things/initialize stuff I want 
that may or may not be Serializable, e.g. an HTTP client or database connection 
from that object
Don't use the Configuration instance since it doesn't do anything anyways
Yes, I think you’re on the right track with this :)

Cheers,
Gordon


On 22 September 2017 at 11:08:21 PM, Michael Kobit (mko...@gmail.com) wrote:

Thanks for the response.

That is a bit surprising that it is always a new instance given the various API 
signatures that take in a Configuration instance. The best practices docs 
(https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_practices.html#using-the-parameters-in-your-flink-program
 ) also sort of mention it, but I just noticed most of those seem like they are 
for the DataSet API rather than the DataStream API (I don't know if there is a 
big difference between the programming APIs there). I'm still new to these 
things, so I could be making invalid assumptions, too.

I think I have a simple idea for how to get dependency style injection working 
anyways by.
Pass in a Serializable "injector"/proxy object in the constructor
In the "open" (or body of the function) get the things/initialize stuff I want 
that may or may not be Serializable, e.g. an HTTP client or database connection 
from that object
Don't use the Configuration instance since it doesn't do anything anyways
I haven't thought through any possible security holes or considerations with 
this approach yet.

Thanks for the response, that clears up my confusion - now just to explore and 
find some better ways to test this stuff!

On Fri, Sep 22, 2017 at 11:51 AM Tzu-Li (Gordon) Tai <tzuli...@apache.org> 
wrote:
Hi,

The passing in of a Configuration instance in the open method is actually a 
leftover artifact of the DataStream API that remains only due to API backwards 
compatibility reasons.
There’s actually no way to modify what configuration is retrieved there (and it 
is actually always a new empty Configuration).

Normally, to inject dependencies into your operators, you would simply do that 
be supplying it through the constructor of the operator, and store it as class 
fields for future use in the operator work methods.
Make sure that they are serializable, as the operator will need to be 
serialized when deploying the job. I’m assuming that this should be possible 
for you anyway, since you were trying to write that information into the 
Configuration.

Hope this helps!

Cheers,
Gordon


On 20 September 2017 at 11:25:41 PM, Michael Kobit (mko...@gmail.com) wrote:

I'm new to Flink and in the process of trying to write a few operators and 
tests for them. One of the issues I've ran into is "how do I properly set up 
the dependencies for an operator". I've discovered the serialization 
constraints and learned about the execution some model as I've started to 
progress through it, but I'm still struggling to find an analog for dependency 
injection in Flink.

I was experimenting with different ways to supply configuration for the *Rich* 
functions to basically set themselves up and tear themselves down with their 
dependencies on open/close. I wanted to basically "inject" a dependency say 
like an HTTP client that caches, and then mock that dependency for a local test 
instead of actually making HTTP calls. It seemed like it could be done by 
getting and getting the correct implementation types from the config using some 
custom injector type (analogous to Spring or Guice dependency injection). I 
know I have to deal serialization of the operators, which is why I was thinking 
I could do this in open/close and have the magical injector be serializable 
(and possibly be part of the config). This may or may not be a bad idea 
already, but bear with me (and any feedback is very appreciated).

I was doing some local testing using StreamExecutionEnvironment, but wasn't 
able to actually pass in configuration options to the local stream execution.

I tried it these ways:
Create with a config - StreamExecutionEnvironment.createLocalEnvironment(1, 
configuration);
Configure the created LocalStreamEnvironment by 
env.getConfig().setGlobalJobParameters(configuration)
Configure thte DataStremSource by 
source.getExecutionConfig().setGlobalJobParameters(configuration)
Configure the SingleOutputStreamOperator by 
mapped.getExecutionConfig().setGlobalJobParameters(configuration)
All 4 of those failed, so I felt like I am doing something wrong here, and 
wanted to reach out.

Here is the example code where all of those tests failing:

import static org.assertj.core.api.Assertions.assertThat;

import org.apache.flink.api.common.functions.RichMapFun

Re: LocalStreamEnvironment - configuration doesn't seem to be used in RichFunction operators

2017-09-22 Thread Tzu-Li (Gordon) Tai
Hi,

The passing in of a Configuration instance in the open method is actually a 
leftover artifact of the DataStream API that remains only due to API backwards 
compatibility reasons.
There’s actually no way to modify what configuration is retrieved there (and it 
is actually always a new empty Configuration).

Normally, to inject dependencies into your operators, you would simply do that 
be supplying it through the constructor of the operator, and store it as class 
fields for future use in the operator work methods.
Make sure that they are serializable, as the operator will need to be 
serialized when deploying the job. I’m assuming that this should be possible 
for you anyway, since you were trying to write that information into the 
Configuration.

Hope this helps!

Cheers,
Gordon

On 20 September 2017 at 11:25:41 PM, Michael Kobit (mko...@gmail.com) wrote:

I'm new to Flink and in the process of trying to write a few operators and 
tests for them. One of the issues I've ran into is "how do I properly set up 
the dependencies for an operator". I've discovered the serialization 
constraints and learned about the execution some model as I've started to 
progress through it, but I'm still struggling to find an analog for dependency 
injection in Flink.

I was experimenting with different ways to supply configuration for the *Rich* 
functions to basically set themselves up and tear themselves down with their 
dependencies on open/close. I wanted to basically "inject" a dependency say 
like an HTTP client that caches, and then mock that dependency for a local test 
instead of actually making HTTP calls. It seemed like it could be done by 
getting and getting the correct implementation types from the config using some 
custom injector type (analogous to Spring or Guice dependency injection). I 
know I have to deal serialization of the operators, which is why I was thinking 
I could do this in open/close and have the magical injector be serializable 
(and possibly be part of the config). This may or may not be a bad idea 
already, but bear with me (and any feedback is very appreciated).

I was doing some local testing using StreamExecutionEnvironment, but wasn't 
able to actually pass in configuration options to the local stream execution.

I tried it these ways:
Create with a config - StreamExecutionEnvironment.createLocalEnvironment(1, 
configuration);
Configure the created LocalStreamEnvironment by 
env.getConfig().setGlobalJobParameters(configuration)
Configure thte DataStremSource by 
source.getExecutionConfig().setGlobalJobParameters(configuration)
Configure the SingleOutputStreamOperator by 
mapped.getExecutionConfig().setGlobalJobParameters(configuration)
All 4 of those failed, so I felt like I am doing something wrong here, and 
wanted to reach out.

Here is the example code where all of those tests failing:

import static org.assertj.core.api.Assertions.assertThat;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.DataStreamUtils;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.Test;

import java.util.Iterator;

public class FlinkInspection {

@Test
public void issueWithLocalStreamEnvironmentCreateWithConfiguration() throws 
Exception {
Configuration configuration = new Configuration();
configuration.setInteger("key", 10);
LocalStreamEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
DataStreamSource source = env.fromElements(1, 2);

SingleOutputStreamOperator mapped = source.map(new 
ConfigurationRetrievingOperator());

Iterator collection = DataStreamUtils.collect(mapped);
env.execute();

assertThat(collection).containsExactlyInAnyOrder(10, 20);
}

@Test
public void 
issueWithLocalStreamEnvironmentConfiguredWithWithConfiguration() throws 
Exception {
Configuration configuration = new Configuration();
configuration.setInteger("key", 10);
LocalStreamEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(1);
env.getConfig().setGlobalJobParameters(configuration);
DataStreamSource source = env.fromElements(1, 2);

SingleOutputStreamOperator mapped = source.map(new 
ConfigurationRetrievingOperator());

Iterator collection = DataStreamUtils.collect(mapped);
env.execute();

assertThat(collection).containsExactlyInAnyOrder(10, 20);
}

@Test
public void issueWithLocalStreamEnvironmentConfiguringDataStreamSource() 
throws Exception {
LocalStreamEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(1);
 

Re: Clarifications on FLINK-KAFKA consumer

2017-09-22 Thread Tzu-Li (Gordon) Tai
Hi Rahul!

1. Will FLink-Kafka consumer 0.8x run on multiple task slots or a single task 
slot? Basically I mean if its going to be a parallel operation or a non 
parallel operation?
Yes, the FlinkKafkaConsumer is a parallel consumer.

2. If its a parallel operation, then do multiple task slots read data from 
single kafka partition or multiple kafka partition?
Each single parallel instance of a FlinkKafkaConsumer source can subscribe to 
multiple Kafka partitions. Each Kafka partition is handled by exactly one 
FlinkKafkaConsumer parallel instance.

3. If data is read from multiple Kafka partition, then how duplication is 
avoided? Is it done from KAFKA or by FLINK?
Yes, the FlinkKafkaConsumer is a parallel consumer.I’m not sure exactly what 
you are referring to by “duplication” here. Do you mean duplication in the data 
itself in the Kafka topics, or duplicated consumption by Flink?
If it is the former: prior to Kafka 0.11, Kafka writes did not support 
transactions and therefore can only have at-least-once writes.
If you mean the latter: the FlinkKafkaConsumer achieves exactly-once guarantees 
when consuming from Kafka topics using Flink’s checkpointing mechanism. You can 
read about that here [1][2].
Hope the pointers help!

- Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/stream_checkpointing.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-and-fault-tolerance

On 22 September 2017 at 10:46:55 AM, Rahul Raj (rahulrajms...@gmail.com) wrote:

Hi,

I have just started working with FLINK and I am working on a project which 
involves reading KAFKA data and processing it. Following questions came to my 
mind:

1. Will FLink-Kafka consumer 0.8x run on multiple task slots or a single task 
slot? Basically I mean if its going to be a parallel operation or a non 
parallel operation?

2. If its a parallel operation, then do multiple task slots read data from 
single kafka partition or multiple kafka partition?

3. If data is read from multiple Kafka partition, then how duplication is 
avoided? Is it done from KAFKA or by FLINK?

Rahul Raj

Re: StreamCorruptedException

2017-09-21 Thread Tzu-Li (Gordon) Tai
Hi Sridhar,

Sorry that this didn't get a response earlier.

According to the trace, it seems like the job failed during the process, and
when trying to automatically restore from a checkpoint, deserialization of a
CEP `IterativeCondition` object failed. As far as I can tell, CEP operators
are just using Java serialization on CEP `IterativeCondition` objects, so
should not be related to the protobuf serializer that you are using.

Is this still constantly happening for you?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Savepoints and migrating value state data types

2017-09-21 Thread Tzu-Li (Gordon) Tai
Hi!

The exception that you have bumped into indicates that on the restore of the
savepoint, the serializer for that registered state in the savepoint no
longer exists. This prevents restoring savepoints taken with memory state
backends because there will be no serializer available to deserialize the
state at restore time.

My current guess of what is happening is that the generated serializers for
case class B1 has different generated names, and therefore in your modified
repackaged job, it would be as if the previous serializer no longer exists
in the classpath.

The serializer generation for Scala case classes (the
`createTypeInformation[B1]` call) depends on some Scala macros, and the
resulting generated anonymous class is sensitive to quite a few factors.
Could you perhaps try to verify this by checking the classname of the
generated serializers (you can get the serializer by
`createTypeInformation[B1].getSerializer(new Configuration())`)?

If they are different for the same case class across 2 different
compilations of your job (one with the B2 case class, one without), then my
assumption is correct. Otherwise, we would of course need to look deeper.
I'll also think about how to probably best workaround that for now meanwhile
and get back to you ..

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Custom Serializers

2017-09-15 Thread Tzu-Li (Gordon) Tai
Hi Nuno,

Because of this, we have a legacy structure that I showed before. 
Could you probably include more information about this legacy structure you 
mentioned here in this mail thread? I couldn’t find any other reference to 
that. That could be helpful to understanding your use case more here.

- Gordon



On 15 September 2017 at 12:59:15 PM, nragon 
(nuno.goncal...@wedotechnologies.com) wrote:

Hi,  

First of all, great #FF17, really enjoyed it.  
After attending some of the dataArtisans folks talks, realized that  
serialization should be optimized if there is no way to use supported  
objects.  
In my case, users can configure their source in our application online which  
gives them freedom to dynamically change the number and type of attributes.  
Moreover, between operator the object can be changed in terms of number of  
attributes.  
Because of this, we have a legacy structure that I showed before.  
Should I implement my own TypeInformation, TypeComparator, TypeSerializer  
and TypeInfoFactory?  
Am I forgetting something?  

Thanks,  
Nuno  




--  
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
 


Re: Change Kafka cluster without loosing the Flink's state

2017-09-14 Thread Tzu-Li (Gordon) Tai
Simply like this:
env.addSource(new FlinkKafkaConsumer(...)).uid(“some-unique-id”)

The same goes for any other operator.

However, do keep in mind this bug that was just recently uncovered: 
https://issues.apache.org/jira/browse/FLINK-7623.
What I described in my previous reply would not work as expected if your Kafka 
consumer was chained to some stateful operator.
The workaround, for now, would be to explicitly disable chaining of the 
consumer source with any stateful operators before taking the savepoint and 
changing the operator UID.
If your consumer source is not chained with any stateful operator, then you can 
ignore this and safely proceed.

Cheers,
Gordon

On 14 September 2017 at 3:49:31 PM, kla (lalafar...@gmail.com) wrote:

Hi Gordon,  

Thanks for your quick reply.  

I have following consumer:  

jobConfiguration.getEnv().addSource(  
new FlinkKafkaConsumer010<>(properties.getProperty(TOPIC), deserializer,  
properties));  

How can I set the UID for the consumer ?  

Thanks again for help!  

Regards,  
Konstantin  



--  
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
 


Re: Change Kafka cluster without loosing the Flink's state

2017-09-14 Thread Tzu-Li (Gordon) Tai
Hi Konstantin,

After migrating and connecting to the new Kafka cluster, do you want the Kafka 
consumer to start fresh without any partition offset state (and therefore will 
re-establish its partition-to-subtask assignments), while keeping all other 
operator state in the pipeline intact?
If so, that is definitely possible. Simply explicitly set a new different 
operator UID for the FlinkKafkaConsumer. When the savepoint is restored, the 
consumer will not be restored with any state since it will be recognized by 
Flink as a new operator in the pipeline without any state.

Cheers,
Gordon

On 14 September 2017 at 11:56:38 AM, kla (lalafar...@gmail.com) wrote:

Hi guys, 

We have a running apache flink streaming job which interacts with apache 
kafka (consumer and producer). 
Now we would like to change the kafka cluster without loosing Flink's state. 

Is it possible to do it ? If yes, what is the right way to do it ? 

Thanks in advance! 
Best, 
Konstantin 



-- 
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 


Re: Using latency markers

2017-09-13 Thread Tzu-Li (Gordon) Tai
Hi Aitozi,

Yes, I think we haven’t really pin-pointed out the actual cause of the problem, 
but if you have a fix for that and can provide a PR we can definitely look at 
it! That would be helpful.
Before opening a PR, also make sure to first open a JIRA for the issue (I don’t 
think there is one yet for this issue).

Cheers,
Gordon

On 13 September 2017 at 12:14:42 PM, aitozi (gjying1...@gmail.com) wrote:

Hi, Aljoscha,  

the dashboard shown NAN is just because the value of the latencyGague is not  
numerical, so it can't be shown in dashboard, i removed the other  
latencydescprition except the sink, so i can see the latency in dashboard,  
do i need to post a pr?  

thanks,  
Aitozi  



--  
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
 


Re: LatencyMarker

2017-09-13 Thread Tzu-Li (Gordon) Tai
Hi,

There is actually latency metrics in the Web UI, but I think there was also 
some previously reported problem on that [1].
Is there a JIRA for the patch work you did that you mentioned? If you have a 
fix for that we could definitely have a look at it.

Looping in Aljoscha also, who I think was looking into the issue of latency 
metrics not showing and may have some status updates on that.

Cheers,
Gordon

[1] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Using-latency-markers-td14791.html#a14799

On 10 September 2017 at 8:39:46 AM, aitozi (gjying1...@gmail.com) wrote:

Hi,  

You are right, i have proved that the backpressure will increase the  
latency,and in have another question, now the value of the latency is not  
Visualization in the dashboard,is there any plan to do this, i have add this  
by remove the other operator and only keep the source -> end latency and  
then i can see the latency in dashboard, if the community accept the patch  

Thanks.  

and now the  
Tzu-Li (Gordon) Tai wrote  
> Hi!  
>  
> Yes, backpressure should also increase the latency value calculated from  
> LatencyMarkers.  
> LatencyMarkers are special events that flow along with the actual stream  
> records, so they should also be affected by backpressure.  
>  
> Are you asking because you observed otherwise?  
>  
> Cheers,  
> Gordon  
>  
>  
>  
> --  
> Sent from:  
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/  





--  
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
 


Re: Exception when using keyby operator

2017-09-13 Thread Tzu-Li (Gordon) Tai
Following up: here’s the JIRA ticket for improving the POJO data type 
documentation - https://issues.apache.org/jira/browse/FLINK-7614.

- Gordon


On 11 September 2017 at 10:31:23 AM, Sridhar Chellappa (flinken...@gmail.com) 
wrote:

That fixed my issue. Thanks. I also agree we need to fix the Documentation

On Thu, Sep 7, 2017 at 6:15 PM, Timo Walther  wrote:
Hi Sridhar,

according to the exception, your "meEvents" stream is not POJO. You can check 
that by printing "meEvents.getType()". In general, you can always check the log 
for such problems. There should be something like:

14:40:57,079 INFO org.apache.flink.api.java.typeutils.TypeExtractor 
- class org.apache.flink.streaming.examples.wordcount.WordCount$MyEvent does 
not contain a setter for field responseTime
14:40:57,083 INFO org.apache.flink.api.java.typeutils.TypeExtractor 
- class org.apache.flink.streaming.examples.wordcount.WordCount$MyEvent is not 
a valid POJO type because not all fields are valid POJO fields.

The problem is that your setters have a return type. A POJO setter usually 
should have a void return type. But I agree that this should be mentioned in 
the documentation.

Regards,
Timo


Am 07.09.17 um 05:20 schrieb Sridhar Chellappa:

I am trying to use the KeyBy operator as follows :


    Pattern myEventsCEPPattern =
Pattern.begin("FirstEvent")
    .subtype(MyEvent.class)
    .next("SecondEvent")
    .subtype(MyEvent.class)
    .within(Time.hours(1));



    PatternStream myEventsPatternStream =
    CEP.pattern(
    meEvents.keyBy("field1", "field6"),
    myEventsCEPPattern
    );



When I run the program, I get the following exception:

The program finished with the following exception:

This type (GenericType) cannot be used as key.
org.apache.flink.api.common.operators.Keys$ExpressionKeys.(Keys.java:330)
org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:294)


MyEvent is a POJO. What is that I am doing wrong?


Here is the relevant code :

public abstract class AbstractEvent {
    private String field1;
    private String field2;
    private String field3;
    private String field4;
    private Timestamp eventTimestmp;

    public AbstractEvent(String field1, String field2, String field3, String 
field4, Timestamp eventTimestmp) {
    this.field1 = field1;
    this.field2 = field2;
    this.field3 = field3;
    this.field4 = field4;
    this.eventTimestmp = eventTimestmp;
    }

    public AbstractEvent() {
    }

    public String getField1() {
    return field1;
    }

    public AbstractEvent setField1(String field1) {
    this.field1 = field1;
    return this;
    }

    public String getField2() {
    return field2;
    }

    public AbstractEvent setField2(String field2) {
    this.field2 = field2;
    return this;
    }

    public String getField3() {
    return field3;
    }

    public AbstractEvent setField3(String field3) {
    this.field3 = field3;
    return this;
    }

    public String getField4() {
    return field4;
    }

    public AbstractEvent setField4(String field4) {
    this.field4 = field4;
    return this;
    }

    public Timestamp getEventTimestmp() {
    return eventTimestmp;
    }

    public AbstractEvent setEventTimestmp(Timestamp eventTimestmp) {
    this.eventTimestmp = eventTimestmp;
    return this;
    }

    @Override
    public boolean equals(Object o) {
    if (this == o) {
    return true;
    }
    if (!(o instanceof AbstractEvent)) {
    return false;
    }

    AbstractEvent that = (AbstractEvent) o;

    if (!getField1().equals(that.getField1())) {
    return false;
    }
    if (!getField2().equals(that.getField2())) {
    return false;
    }
    if (!getField3().equals(that.getField3())) {
    return false;
    }
    if (!getField4().equals(that.getField4())) {
    return false;
    }
    return getEventTimestmp().equals(that.getEventTimestmp());
    }

    @SuppressWarnings({"MagicNumber"})
    @Override
    public int hashCode() {
    int result = getField1().hashCode();
    result = 31 * result + getField2().hashCode();
    result = 31 * result + getField3().hashCode();
    result = 31 * result + getField4().hashCode();
    result = 31 * result + getEventTimestmp().hashCode();
    return result;
    }

    @Override
    public String toString() {
    return "AbstractEvent{"
    + "field1='" + field1 + '\''
    + ", field2='" + field2 + '\''
    + ", field3='" + field3 + '\''
    + ", field4='" + field4 + '\''
    + ", eventTimestmp=" + eventTimestmp
   

Re: BucketingSink never closed

2017-09-13 Thread Tzu-Li (Gordon) Tai
Ah, sorry, one correction. Just realized there’s already some analysis of the 
BucketingSink closing issue in this mail thread.
Please ignore my request for relevant logs :)


On 13 September 2017 at 10:56:10 AM, Tzu-Li (Gordon) Tai (tzuli...@apache.org) 
wrote:

Hi Flavio,

Let me try to understand / look at some of the problems you have encountered.
checkpointing: it's not clear which checkpointing system to use and how to 
tune/monitor it and avoid OOM exceptions.
What do you mean be which "checkpointing system” to use? Do you mean state 
backends? Typically, you would only get OOM exceptions for memory-backed state 
backends if the state size exceeds the memory capacity. State sizes can be 
queried from the REST APIs / Web UI.
cleanup: BucketingSink doesn't always move to final state
This sounds like a bug that we should look into. Do you have any logs on which 
you observed this?

missing output formats: parquet support to write generic Rows not very well 
supported (at least out of the box) [1]
Would you be up to opening up JIRAs for what you think is missing (if there 
isn’t one already)?

progress monitoring: for example in the ES connector there's no way (apart from 
using accumulators) to monitor the progress of the indexing
Maybe we can add some built-in metric in the ES sink connector that tracks the 
number of successfully indexed elements, which can then be queried from the 
REST API / Web UI. That wouldn’t be too much effort. What do you think, would 
that be useful for your case?

Would be happy to hear your thoughts on this!

Cheers,
Gordon


On 12 September 2017 at 11:36:27 AM, Flavio Pompermaier (pomperma...@okkam.it) 
wrote:

For the moment I give up with streaming...too many missing/unclear features wrt 
batch. 
For example:
checkpointing: it's not clear which checkpointing system to use and how to 
tune/monitor it and avoid OOM exceptions. Moreover is it really necessary to 
use it? For example if I read a file from HDFS and I don't have a checkpoint it 
could be ok to re-run the job on all the data in case of errors (i.e. the 
stream is managed like a batch)
cleanup: BucketingSink doesn't always move to final state
missing output formats: parquet support to write generic Rows not very well 
supported (at least out of the box) [1]
progress monitoring: for example in the ES connector there's no way (apart from 
using accumulators) to monitor the progress of the indexing
[1] 
https://stackoverflow.com/questions/41144659/flink-avro-parquet-writer-in-rollingsink

Maybe I'm wrong with those points but the attempt to replace my current batch 
system with a streaming one had no luck with those points.

Best,
Flavio

On Fri, Sep 8, 2017 at 5:29 PM, Aljoscha Krettek <aljos...@apache.org> wrote:
Hi,

Expanding a bit on Kostas' answer. Yes, your analysis is correct, the problem 
is that the job is shutting down before a last checkpoint can "confirm" the 
written bucket data by moving it to the final state. The problem, as Kostas 
noted is that a user function (and thus also BucketingSink) does not know 
whether close() is being called because of a failure or because normal job 
shutdown. Therefore, we cannot move data to the final stage there.

Once we have the issue that Kostas posted resolve we can also resolve this 
problem for the BucketingSink.

Best,
Aljoscha

On 8. Sep 2017, at 16:48, Kostas Kloudas <k.klou...@data-artisans.com> wrote:

Hi Flavio,

If I understand correctly, I think you bumped into this issue: 
https://issues.apache.org/jira/browse/FLINK-2646

There is also a similar discussion on the BucketingSink here: 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Adding-a-dispose-method-in-the-RichFunction-td14466.html#a14468

Kostas

On Sep 8, 2017, at 4:27 PM, Flavio Pompermaier <pomperma...@okkam.it> wrote:

Hi to all,
I'm trying to test a streaming job but the files written by the BucketingSink 
are never finalized (remains into the pending state).
Is this caused by the fact that the job finishes before the checkpoint?
Shouldn't the sink properly close anyway?

This is my code:

  @Test
  public void testBucketingSink() throws Exception {
    final StreamExecutionEnvironment senv = 
StreamExecutionEnvironment.getExecutionEnvironment();
    final StreamTableEnvironment tEnv = 
TableEnvironment.getTableEnvironment(senv);
    senv.enableCheckpointing(5000);
    DataStream testStream = senv.fromElements(//
        "1,aaa,white", //
        "2,bbb,gray", //
        "3,ccc,white", //
        "4,bbb,gray", //
        "5,bbb,gray" //
    );
    final RowTypeInfo rtf = new RowTypeInfo(
        BasicTypeInfo.STRING_TYPE_INFO,
        BasicTypeInfo.STRING_TYPE_INFO, 
        BasicTypeInfo.STRING_TYPE_INFO);
    DataStream rows = testStream.map(new MapFunction<String, Row>() {

      private static final long serialVersionUID = 1L;

      @Override
      public Row map(String st

Re: BucketingSink never closed

2017-09-13 Thread Tzu-Li (Gordon) Tai
Hi Flavio,

Let me try to understand / look at some of the problems you have encountered.
checkpointing: it's not clear which checkpointing system to use and how to 
tune/monitor it and avoid OOM exceptions.
What do you mean be which "checkpointing system” to use? Do you mean state 
backends? Typically, you would only get OOM exceptions for memory-backed state 
backends if the state size exceeds the memory capacity. State sizes can be 
queried from the REST APIs / Web UI.
cleanup: BucketingSink doesn't always move to final state
This sounds like a bug that we should look into. Do you have any logs on which 
you observed this?

missing output formats: parquet support to write generic Rows not very well 
supported (at least out of the box) [1]
Would you be up to opening up JIRAs for what you think is missing (if there 
isn’t one already)?

progress monitoring: for example in the ES connector there's no way (apart from 
using accumulators) to monitor the progress of the indexing
Maybe we can add some built-in metric in the ES sink connector that tracks the 
number of successfully indexed elements, which can then be queried from the 
REST API / Web UI. That wouldn’t be too much effort. What do you think, would 
that be useful for your case?

Would be happy to hear your thoughts on this!

Cheers,
Gordon


On 12 September 2017 at 11:36:27 AM, Flavio Pompermaier (pomperma...@okkam.it) 
wrote:

For the moment I give up with streaming...too many missing/unclear features wrt 
batch. 
For example:
checkpointing: it's not clear which checkpointing system to use and how to 
tune/monitor it and avoid OOM exceptions. Moreover is it really necessary to 
use it? For example if I read a file from HDFS and I don't have a checkpoint it 
could be ok to re-run the job on all the data in case of errors (i.e. the 
stream is managed like a batch)
cleanup: BucketingSink doesn't always move to final state
missing output formats: parquet support to write generic Rows not very well 
supported (at least out of the box) [1]
progress monitoring: for example in the ES connector there's no way (apart from 
using accumulators) to monitor the progress of the indexing
[1] 
https://stackoverflow.com/questions/41144659/flink-avro-parquet-writer-in-rollingsink

Maybe I'm wrong with those points but the attempt to replace my current batch 
system with a streaming one had no luck with those points.

Best,
Flavio

On Fri, Sep 8, 2017 at 5:29 PM, Aljoscha Krettek  wrote:
Hi,

Expanding a bit on Kostas' answer. Yes, your analysis is correct, the problem 
is that the job is shutting down before a last checkpoint can "confirm" the 
written bucket data by moving it to the final state. The problem, as Kostas 
noted is that a user function (and thus also BucketingSink) does not know 
whether close() is being called because of a failure or because normal job 
shutdown. Therefore, we cannot move data to the final stage there.

Once we have the issue that Kostas posted resolve we can also resolve this 
problem for the BucketingSink.

Best,
Aljoscha

On 8. Sep 2017, at 16:48, Kostas Kloudas  wrote:

Hi Flavio,

If I understand correctly, I think you bumped into this issue: 
https://issues.apache.org/jira/browse/FLINK-2646

There is also a similar discussion on the BucketingSink here: 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Adding-a-dispose-method-in-the-RichFunction-td14466.html#a14468

Kostas

On Sep 8, 2017, at 4:27 PM, Flavio Pompermaier  wrote:

Hi to all,
I'm trying to test a streaming job but the files written by the BucketingSink 
are never finalized (remains into the pending state).
Is this caused by the fact that the job finishes before the checkpoint?
Shouldn't the sink properly close anyway?

This is my code:

  @Test
  public void testBucketingSink() throws Exception {
    final StreamExecutionEnvironment senv = 
StreamExecutionEnvironment.getExecutionEnvironment();
    final StreamTableEnvironment tEnv = 
TableEnvironment.getTableEnvironment(senv);
    senv.enableCheckpointing(5000);
    DataStream testStream = senv.fromElements(//
        "1,aaa,white", //
        "2,bbb,gray", //
        "3,ccc,white", //
        "4,bbb,gray", //
        "5,bbb,gray" //
    );
    final RowTypeInfo rtf = new RowTypeInfo(
        BasicTypeInfo.STRING_TYPE_INFO,
        BasicTypeInfo.STRING_TYPE_INFO, 
        BasicTypeInfo.STRING_TYPE_INFO);
    DataStream rows = testStream.map(new MapFunction() {

      private static final long serialVersionUID = 1L;

      @Override
      public Row map(String str) throws Exception {
        String[] split = str.split(Pattern.quote(","));
        Row ret = new Row(3);
        ret.setField(0, split[0]);
        ret.setField(1, split[1]);
        ret.setField(2, split[2]);
        return ret;
      }
    }).returns(rtf);

    String columnNames = "id,value,state";
    final String dsName = "test";
    

Re: can flink do streaming from data sources other than Kafka?

2017-09-07 Thread Tzu-Li (Gordon) Tai
Ah, I see. I’m not aware of any existing work / JIRAs on streaming sources for 
Cassandra or HBase, only sinks.
If you are interested in one, could you open JIRAs for them?


On 7 September 2017 at 4:11:05 PM, kant kodali (kanth...@gmail.com) wrote:

Hi Gordon,

Thanks for the response, I did go over the links for sources and sinks prior to 
posting my question. Maybe, I didn't get my question across correctly so let me 
rephrase it. Can I get data out of data stores like Cassandra, Hbase in a 
streaming manner? coz, currently more or less all the sources are of message 
queue family.

Thanks,
Kant

On Thu, Sep 7, 2017 at 1:04 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
Hi!


I am wondering if Flink can do streaming from data sources other than Kafka. 
For example can Flink do streaming from a database like Cassandra, HBase, 
MongoDb to sinks like says Elastic search or Kafka.

Yes, Flink currently supports various connectors for different sources and 
sinks. For an overview you can check out this documentation [1]
Apache Bahir [2] also maintains some Flink connectors and is released 
separately.

Also for out of core stateful streaming. Is RocksDB the only option?
Currently, RocksDB is the only option for out-of-core state. There was some 
previous discussion for a Cassandra state backend, though [3].

- Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/index.html
[2] http://bahir.apache.org/
[3] https://issues.apache.org/jira/browse/FLINK-4266

On 7 September 2017 at 2:58:38 PM, kant kodali (kanth...@gmail.com) wrote:

Hi All,

I am wondering if Flink can do streaming from data sources other than Kafka. 
For example can Flink do streaming from a database like Cassandra, HBase, 
MongoDb to sinks like says Elastic search or Kafka.

Also for out of core stateful streaming. Is RocksDB the only option? Can I use 
some other key value store that has SQL interface (since RocksDB doesn't)?

Thanks,
kant



Re: can flink do streaming from data sources other than Kafka?

2017-09-07 Thread Tzu-Li (Gordon) Tai
Hi!


I am wondering if Flink can do streaming from data sources other than Kafka. 
For example can Flink do streaming from a database like Cassandra, HBase, 
MongoDb to sinks like says Elastic search or Kafka.

Yes, Flink currently supports various connectors for different sources and 
sinks. For an overview you can check out this documentation [1]
Apache Bahir [2] also maintains some Flink connectors and is released 
separately.

Also for out of core stateful streaming. Is RocksDB the only option?
Currently, RocksDB is the only option for out-of-core state. There was some 
previous discussion for a Cassandra state backend, though [3].

- Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/index.html
[2] http://bahir.apache.org/
[3] https://issues.apache.org/jira/browse/FLINK-4266

On 7 September 2017 at 2:58:38 PM, kant kodali (kanth...@gmail.com) wrote:

Hi All,

I am wondering if Flink can do streaming from data sources other than Kafka. 
For example can Flink do streaming from a database like Cassandra, HBase, 
MongoDb to sinks like says Elastic search or Kafka.

Also for out of core stateful streaming. Is RocksDB the only option? Can I use 
some other key value store that has SQL interface (since RocksDB doesn't)?

Thanks,
kant

Re: Process Function

2017-09-05 Thread Tzu-Li (Gordon) Tai
Hi Navneeth,

Currently, I don't think there is any built-in functionality to trigger
onTimer periodically.
As for the second part of your question, do you mean that you want to query
on which key the fired timer was registered from? I think this also isn't
possible right now.

I'm looping in Aljoscha in CC in case he has more insight on this.

Cheers,
Gordon


On Tue, Sep 5, 2017 at 4:55 PM, Biplob Biswas 
wrote:

> How are you determining your data is stale? Also if you want to know the
> key,
> why don't you store the key in your state as well?
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: LatencyMarker

2017-09-05 Thread Tzu-Li (Gordon) Tai
Hi!

Yes, backpressure should also increase the latency value calculated from
LatencyMarkers.
LatencyMarkers are special events that flow along with the actual stream
records, so they should also be affected by backpressure.

Are you asking because you observed otherwise?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: State Maintenance

2017-09-05 Thread Tzu-Li (Gordon) Tai
Hi Navneeth,

Answering your three questions separately:

1. Yes. Your MapState will be backed by RocksDB, so when removing an entry
from the map state, the state will be removed from the local RocksDB as
well.

2. If state classes are not POJOs, they will be serialized by Kryo, unless a
custom serializer is specifically specified otherwise. You can take a look
at this document on how to do that [1].

3. I might need to know more information to be able to suggest properly for
this one. How are you using the "huge state values"? From what you
described, it seems like you only need it on one of the parallel instances,
so I'm a bit curious on what they are actually used for. Are they needed
when processing your records?

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#custom-serialization-for-managed-state



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How to flush all window states after Kafka (0.10.x) topic was removed

2017-09-05 Thread Tzu-Li (Gordon) Tai
Hi Tony,

Currently, the functionality that you described does not exist in the
consumer. When a topic is deleted, as far as I know, the consumer would
simply consider the partitions as unreachable and continue to try fetching
records from them until they are up again.
I'm not entirely sure if a removed topic is distinguishable from a
temporarily out-of-service partition due to Kafka brokers being down in the
Kafka API, may need to take a look.

As for the "workaround" that you are using at the moment, you can actually
use `KeyedDeserializationSchema#isEndOfStream` for that. When that returns
true and the source subtask closes, the Long.MAX_VALUE watermark will be
emitted.

Cheers,
Gordon

On Tue, Sep 5, 2017 at 2:50 PM, Tony Wei  wrote:

> Hi,
>
> I have a simple streaming job consuming data from Kafka and use time
> window to aggregate them.
> I am wondering if there is a built-in function to send a max watermark
> when consumer find this topic is not available, so that the window function
> can flush all state to the sink function.
>
> My Kafka version is 0.10.x. Currently, the workaround to me is using
> `TimestampAssigner` to check a specific record as termination message, and
> make the watermark be Long.MAX_VALUE.
> I will send this message to all partitions before I remove that topic.
>
> I would appreciate if anyone has some suggestions. Thank you.
>
> Best Regards,
> Tony Wei
>


Re: Avro Serialization and RocksDB Internal State

2017-08-18 Thread Tzu-Li (Gordon) Tai
Hi Biplob,

Yes, your assumptions are correct [1]. To be a bit more exact, the 
`AvroSerializer` will be used to serialize your POJO data types.
That would be the case for data transfers and state serialization (unless for 
state serialization you specify a custom state serializer; see [2])

If not is there a way to register my objects (sometimes not exactly a POJO 
as recognized by Flink) with the avro serializers? 
If your objects are not recognized as POJO (in which case the 
`GenericTypeInformation` will be extracted instead of `PojoTypeInformation`), 
there’s also an alternative way to serialize to RocksDB using Avro.

1. You can simply register custom serializers for that type [3].
2. Use custom state serializers for that specific registered state. Please see 
[2] for instructions on how to do that.

Hope this helps!

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/types_serialization.html#serialization-of-pojo-types
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#custom-serialization-for-managed-state
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/custom_serializers.html#register-a-custom-serializer-for-your-flink-program


On 15 August 2017 at 4:36:27 PM, Biplob Biswas (revolutioni...@gmail.com) wrote:

Hi,  

This is somewhat related to my previous query here:  
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Evolving-serializers-and-impact-on-flink-managed-states-td14777.html
  

I was exploring Avro Serialization and in that regard I enabled the force  
use of avro using,  

env.getConfig().enableForceAvro();  

Now, my assumption is all internal data transfers will be done using the  
Avro serde, what I also assume is that on my RocksDB state backend my  
objects would also be stored after serializing with avro.  

*Is this assumption correct? *  

If not is there a way to register my objects (sometimes not exactly a POJO  
as recognized by Flink) with the avro serializers?  

I want to de-/serialize my objects using avro and store it on my RocksDB  
state backend, but I am not really aware how can I verify the serializer  
which is used to perform that.  

Thanks and Regards,  
Biplob  



--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Avro-Serialization-and-RocksDB-Internal-State-tp14912.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: [EXTERNAL] Re: Fink application failing with kerberos issue after running successfully without any issues for few days

2017-08-17 Thread Tzu-Li (Gordon) Tai
Hi Raja,

Can you please confirm if I have to use the below settings to ensure I use 
keytabs?

 

security.kerberos.login.use-ticket-cache:
Indicates whether to read from your Kerberos ticket cache (default: true).

 

security.kerberos.login.keytab:
Absolute path to a Kerberos keytab file that contains the user credentials.

 

security.kerberos.login.principal:
Kerberos principal name associated with the keytab.

 

security.kerberos.login.contexts: A comma-separated list of login contexts to 
provide the Kerberos credentials to (for example, Client,KafkaClient to use the 
credentials for ZooKeeper authentication and for Kafka authentication).
Yes, these are the exact configs that you’ll need to set.


Also a quick question, once I make these changes to use keytabs instead of 
ticket cache, Is there any place in the logs I can check, were the setting I 
made are in use and the applications are not actually using again ticket cache 
again?

You should be able to find logs such as “Adding keytab  to the AM 
container …” at the beginning of the job submission.


Cheers,
Gordon
On 18 August 2017 at 5:51:57 AM, Raja.Aravapalli (raja.aravapa...@target.com) 
wrote:

 

Thanks a lot Eron…

 

If I am understanding you correct, you suggest using keytabs to launch 
streaming applications!

 

Can you please confirm if I have to use the below settings to ensure I use 
keytabs?

 

security.kerberos.login.use-ticket-cache:
Indicates whether to read from your Kerberos ticket cache (default: true).

 

security.kerberos.login.keytab:
Absolute path to a Kerberos keytab file that contains the user credentials.

 

security.kerberos.login.principal:
Kerberos principal name associated with the keytab.

 

security.kerberos.login.contexts: A comma-separated list of login contexts to 
provide the Kerberos credentials to (for example, Client,KafkaClient to use the 
credentials for ZooKeeper authentication and for Kafka authentication).
 

https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#kerberos-based-security-1

 

 

Also a quick question, once I make these changes to use keytabs instead of 
ticket cache, Is there any place in the logs I can check, were the setting I 
made are in use and the applications are not actually using again ticket cache 
again?

 

Thanks a lot, in advance.

 

 

Regards,

Raja.

 

From: Eron Wright 
Date: Thursday, August 17, 2017 at 1:06 PM
To: Ted Yu 
Cc: Raja Aravapalli , "user@flink.apache.org" 

Subject: Re: [EXTERNAL] Re: Fink application failing with kerberos issue after 
running successfully without any issues for few days

 

Raja,

According to those configuration values, the delegation token would be 
automatically renewed every 24 hours, then expire entirely after 7 days.   You 
say that the job ran without issue for 'a few days'.  Can we conclude that the 
job hit the 7-day DT expiration?

 

Flink supports the use of Kerberos keytabs as an alternative to delegation 
tokens for exactly this reason, that delegation tokens eventually expire and so 
aren't useful to a long-running program.   Consider making use of keytabs here.

 

Hope this helps!

-Eron

 

 

On Thu, Aug 17, 2017 at 9:58 AM, Ted Yu  wrote:

I think this needs to be done by the admin.

 

On Thu, Aug 17, 2017 at 9:37 AM, Raja.Aravapalli  
wrote:

 

I don’t have access to the site.xml files, it is controlled by a support team.

 

Does flink has any configuration settings or api’s thru which we can control 
this ?

 

 

Regards,

Raja.

 

From: Ted Yu 
Date: Thursday, August 17, 2017 at 11:07 AM
To: Raja Aravapalli 
Cc: "user@flink.apache.org" 
Subject: Re: [EXTERNAL] Re: Fink application failing with kerberos issue after 
running successfully without any issues for few days

 

Can you try shortening renewal interval to something like 2880 ?

 

Cheers

 

On Thu, Aug 17, 2017 at 8:58 AM, Raja.Aravapalli  
wrote:

Hi Ted,

 

Below is what I see in the environment:

 

dfs.namenode.delegation.token.max-lifetime:  60480

dfs.namenode.delegation.token.renew-interval:  8640

 

 

Thanks.

 

 

Regards,

Raja.

 

From: Ted Yu 
Date: Thursday, August 17, 2017 at 10:46 AM
To: Raja Aravapalli 
Cc: "user@flink.apache.org" 
Subject: [EXTERNAL] Re: Fink application failing with kerberos issue after 
running successfully without any issues for few days

 

What are the values for the following parameters ?

 

dfs.namenode.delegation.token.max-lifetime

 

dfs.namenode.delegation.token.renew-interval

 

Cheers

 

On Thu, Aug 17, 2017 at 8:24 AM, Raja.Aravapalli  
wrote:

Hi Ted,

 

Find below the configuration I see in 

Re: Efficient grouping and parallelism on skewed data

2017-08-17 Thread Tzu-Li (Gordon) Tai
Hi John,

Do you need to do any sort of grouping on the keys and aggregation? Or are you 
simply using Flink to route the Kafka messages to different Elasticsearch 
indices?

For the following I’m assuming the latter:
If there’s no need for aggregate computation per key, what you can do is simply 
do is pipeline the input stream directly to the Elasticsearch sink.
The Flink Elasticsearch Sink API allows you to request each individual incoming 
record to a different index.
If you want to have more Elasticsearch sink instances for a specific id, what 
you can do is split the stream, splitting out ids that you know to have higher 
throughput, and pipeline that split stream to an Elasticsearch Sink with higher 
parallelism.

Gordon

On 18 August 2017 at 11:06:02 AM, Jakes John (jakesjohn12...@gmail.com) wrote:

Can some one help me in figuring out how to implement in flink. 

I have to create a pipeline Kafka->flink->elasticsearch. I have high throughput 
data coming into Kafka. All messages in Kafka have a key called 'id' and value 
is a integer that ranges 1 to N. N is dynamic with max value as 100.  The 
number of messages across different ID's are drastically different. For eg. 
Number of incoming messages with id 10 can be 500 times the number of incoming 
messages with id 11.   
One requirement is that messages with a particular id has to be written to a 
corresponding elasticsearch index. Eg. Messages with id 1 is written to 
elasticsearch index 1, Messages with id 2 is written to elasticsearch index 2 
and so on. ... In other words, there will be 100 elasticsearch indices at most.

I have the control over Kafka. I can make sure that messages are written to a 
single topic or messages are separately written to different topics based on 
their ids.  The only requirement is that messages are written to indices that 
correspond to the ids.

1. What are the possible ways that I can achieve this in Flink? 
2. If I use a single kafka topic and a single flink job,  what is the best way 
to group ids in this case and set parallelism according to the distribution of 
data.? The parallelism required to write into ES is going to be different for 
different ids(as i said earlier, distribution of data across ids are 
drastically different).
3. If i have a Kafka topic per id and a topology per id looks ugly and too 
resource intensive. There are some ids that have very very few data. What is 
the best way to do this if we were to choose this option ?



Re:Re:Re:Re:Re:How to verify the data to Elasticsearch whether correct or not ?

2017-08-17 Thread Tzu-Li (Gordon) Tai
Hi,

I see what you were asking about now.

Yes, it doesn’t make sense to sink an object to Elasticsearch. You either need 
to transform the object to a JSON using libraries like Protobuf / Jackson / 
etc., or disintegrate it yourself into a Map.

One thing I noticed is:
json.put("json", JsonFormat.printToString(element))
If what you want is all fields in this object to be able to be queried in 
Elasticsearch, what you could do is simply:

`Requests.indexRequest().index(...).type(…).source()`

This would work fine, and you can also save the extra redundant layer in your 
sinked JSON.

On 17 August 2017 at 11:23:15 AM, mingleizhang (18717838...@163.com) wrote:

Ahhh. Sorry Ted. I didnt see the code was broken. Yep, I will directly put the 
text code here. 

Dependency is 

  com.googlecode.protobuf-java-format
  protobuf-java-format
  1.2

And the adding code is like following. This time, although I sink an object to 
Elasticsearch, I convert it to a JSON by JsonFormat.printToString(element). And 
it can solve my issue as I can get my data from Elasticsearch represent as a 
json string, then I can use it to show my data as a front end. 
// sink the filtered data to ElasticSearch
clickStreamFiltered.addSink(new 
ElasticsearchSink[ActivityInfo](configElasticSearch, transportAddress, new 
ElasticsearchSinkFunction[ActivityInfo] {
  def createIndexRequest(element: ActivityInfo): IndexRequest = {
val json = new java.util.HashMap[String, AnyRef]
json.put("activityInfo", element)
json.put("mid", element.getMid)
json.put("activity", element.getActivity)
json.put("json", JsonFormat.printToString(element))

Requests.indexRequest().index("filter_event_tracking").`type`("my-type-2").source(json)
  }
  override def process(activityInfo: ActivityInfo, runtimeContext: 
RuntimeContext, requestIndexer: RequestIndexer): Unit = {
requestIndexer.add(createIndexRequest(activityInfo))
  }
}))
Peace.
zhangminglei / mingleizhang


At 2017-08-17 09:21:47, "Ted Yu" <yuzhih...@gmail.com> wrote:
Did you use image for the code ?
Can you send plain code again ?
Cheers

 Original message 
From: mingleizhang <18717838...@163.com>
Date: 8/16/17 6:16 PM (GMT-08:00)
To: mingleizhang <18717838...@163.com>
Cc: "Tzu-Li (Gordon) Tai" <tzuli...@apache.org>, user@flink.apache.org
Subject: Re:Re:Re:Re:How to verify the data to Elasticsearch whether correct or 
not ?

I solved the issue by adding a dependency that convert the protobuf objects 
into a JSON. By adding a line of code like below:  element is a PB object.



Thanks.
zhangminglei



At 2017-08-16 22:52:30, "mingleizhang" <18717838...@163.com> wrote:
I looked into the sinked data which in ElasticSearch. Good news I can found it 
is really right there. But but, I sinked the data is an object. But the 
Elasticsearch represent it as a string. I put the related code below.

element type is an ActivityInfo. then, I wrote a java api to read the data. the 
value is a string instead. I want it represented as an object of ActivityInfo. 
But it didnt do like what i want.

Can anybody give me some advice for it ? Thank you very much!








Thanks
zhangminglei / mingleizhang



At 2017-08-16 20:52:34, "mingleizhang" <18717838...@163.com> wrote:

Hi, Gordon.

      I am not sure about this, as far as I know. ElasticSearch often store 
JSON data inside it as it is convenient to create it's index. As refers to my 
code below, I stored the protobuf objects (ActivityInfo which build from 
activityinfo.proto file) in ElasticSearch. And it is a binary data stored in 
it. It is very strange I feel. Flink document just give an example for it's 
data which type belongs to a string as JSON.

Peace,
Zhangminglei




At 2017-08-16 13:27:10, "Tzu-Li (Gordon) Tai" <tzuli...@apache.org> wrote:
Hi,

I couldn’t spot anything off in the code snippet you provided. So you should be 
ok with this :)

Cheers,
Gordon


On 15 August 2017 at 9:18:59 PM, mingleizhang (18717838...@163.com) wrote:

BTW, ActivityInfo is an PB object build from xxx.proto. And already has it's 
value setted to itself.




At 2017-08-15 21:17:00, "mingleizhang" <18717838...@163.com> wrote:
Hi, flink experts!

I sinked my data ( PB objects ) to elasticsearch. I dont know whether the 
sinked data is correct or incorrect. The codes like following, Could you help 
me check it please ? Im not familar with ES. Now, I want to install a kibana to 
view my data. But I dont know the below codes is correct or incorrect. I ran 
the flink program. it does not give me an error. I just want to confirm.


// sink the filtered data to ElasticSearch
clickStreamFiltered.addSink(new 
ElasticsearchSink[ActivityInfo](configElasticSearch, transportAddress, new 
ElasticsearchSinkFunction[ActivityInfo] {
  def createIndexRequest(element: ActivityInf

Re: Error during Kafka connection

2017-08-14 Thread Tzu-Li (Gordon) Tai
Hi,

I don’t have experience running Kafka clusters behind proxies, but it seems 
like the configurations “advertised.host.name” and “advertised.port” for your 
Kafka brokers are what you’re looking for.
For information on that please refer to the Kafka documentations.

Cheers,
Gordon


On 12 August 2017 at 4:28:41 PM, AndreaKinn (kinn6...@hotmail.it) wrote:

It is solvable? I'm not an expert of this stuff and the cluster is managed by  
the lab responsible. Maybe I can ask him to do something in order to solve.  



--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-during-Kafka-connection-tp14822p14852.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: Error during Kafka connection

2017-08-11 Thread Tzu-Li (Gordon) Tai
No, there should be no difference between setting it up on Ubuntu or OS X.

I can’t really tell any anything suspicious from the information provided so 
far, unfortunately.
Perhaps you can try first checking that the Kafka topic is consumable from 
where you’re running Flink, e.g. using the example console consumer / producers?


On 11 August 2017 at 7:06:46 PM, AndreaKinn (kinn6...@hotmail.it) wrote:

the kafka version I use is the latest (0.11.0.0). But to be honestly, also  
locally I use 0.11.0.0 and in that case it works correctly. Anyway the last  
kafka connector on flink is designed for kafka 0.10.x.x  

I use OS X locally and Ubuntu on the cluster. It has importance?  



--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-during-Kafka-connection-tp14822p14824.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: Aggregation based on Timestamp

2017-08-11 Thread Tzu-Li (Gordon) Tai
Hi,

Yes, this is definitely doable in Flink, and should be very straightforward.

Basically, what you would do is define a FlinkKafkaConsumer source for your 
Kafka topic [1], following that a keyBy operation on the hostname [2], and then 
a 1-minute time window aggregation [3]. At the end of your pipeline would be a 
InfluxDB sink. There isn’t one out of the box, but it should be fairly easy to 
implement.
If you want deterministic results based on event-time processing, that is also 
possible [4].

Just throwing you links to get started here :) Let us know if you have more 
problems getting started.

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#datastream-transformations
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html
[4] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html

On 10 August 2017 at 8:52:25 PM, Madhukar Thota (madhukar.th...@gmail.com) 
wrote:

Hi

We have use case where we have thousands of Telegraf agents sending data to 
kafka( some of them are sending 10s interval, 15s interval and 30s interval). 
We would like to aggregate the incoming data to 1 minuter interval based on the 
hostname as key before we write into influxdb. Is it possible to do this type 
of usecase with Flink? if so any sample to get started?

sample data ( influxdb line protocal) coming from Kafka

weather,location=us-midwest,season=summer temperature=82 1465839830100400200

-Madhu

Re: Error during Kafka connection

2017-08-11 Thread Tzu-Li (Gordon) Tai
Hi,

AFAIK, Kafka group coordinators are supposed to always be marked dead, because 
we use static assignment internally and therefore Kafka's group coordination 
functionality is disabled.

Though it may be obvious, but to get that out of the way first: are you sure 
that the Kafka installation version matches (i.e. 0.10.0.1)?

Cheers,
Gordon

On 11 August 2017 at 6:43:51 PM, AndreaKinn (kinn6...@hotmail.it) wrote:

Hi, 
In the last week I have correctly deployed a flink program which get data 
from a kafka broker on my local machine. 
Now I'm trying to produce the same thing but moving the kafka broker on a 
cluster. 

I didn't change any line of code, I report it here: 

DataStream> stream = env 
.addSource(new FlinkKafkaConsumer010<>(TOPIC, new CustomDeserializer(), 
properties)) 
.assignTimestampsAndWatermarks(new CustomTimestampExtractor()) 
.keyBy(0); 

While I have changed just the Kafka Ip. 
Data model obviously is not changed. 
Unfortunately now when I start Flink program I get this: 

INFO org.apache.kafka.common.utils.AppInfoParser - Kafka 
version : 0.10.0.1 
12:30:48,446 INFO org.apache.kafka.common.utils.AppInfoParser 
- Kafka commitId : a7a17cdec9eaa6c5 
12:30:48,625 INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
Discovered coordinator giordano-1-4-200:9092 (id: 2147483647 rack: null) for 
group groupId. 
12:30:48,626 INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - *Marking 
the coordinator giordano-1-4-200:9092 (id: 2147483647 rack: null) dead for 
group groupId* 

I bolded the line that worry me. 

Then, no data are retrieved buy Kafka although flink continue to perform 
checkpointing etc normally... 

Any ideas? 




-- 
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-during-Kafka-connection-tp14822.html
 
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com. 


Re: load + update global state

2017-08-07 Thread Tzu-Li (Gordon) Tai
Hi Peter!

One thing I’d like to understand first after reading about your use case:
Why exactly do you need the lookup table to be globally accessible? From what I 
understand, you are using this lookup table for stream event enriching, so 
whatever processing you need to perform downstream on this enriched stream, you 
would already have the corresponding information for each session attached.

Regarding a solution for efficient stream enriching in your case:
In your case, the enrichment data comes from the input events itself, so it can 
be fairly straightforward: use a MapFunction that keeps the lookup table as 
managed keyed state [1].
By using RocksDB as your state backend [2], the table would not be backed by 
memory and therefore your state size is only bounded by disk size. Each state 
access would be bound to the current processed key (i.e., in your case session 
id, meaning that you’d only be accessing the emails set of that session).
Using RocksDB as your state backend, each state access and update would require 
de-/serialization (of the state of a single key), but that would always be 
local access and in general would outperform remotely looking up an external 
store.

So, to wrap this up, the answers to your doubts, when using Flink, would be:

(1) load the state as a whole from the data store into memory is a huge burn of 
memory (also making changes cluster-wide visible is an issue) 
Apart from the “cluster-wide visibility” aspect which needs to be clarified, 
you can use RocksDB as the state backend to back the state and not keep the 
state in memory.

(2) not loading into memory but using something like cassandra / redis as a 
lookup store would certainly work but introduces a lot of network requests 
(possible ideas: use a distributed cache? broadcast updates in flink cluster?) 
Remote lookup is not required, if you keep the lookup store as managed keyed 
state in Flink. All session lookup would be local state access. You can think 
of it as you’re basically setting up a K-V store within Flink that is always 
co-partitioned by session id with your incoming events.

(3) how should I integrate the changes to the table with flink's checkpointing? 
Simply by registering managed keyed state. Flink will handle checkpointing that 
for fault tolerance for you, and ensuring exactly-once. The “Working with 
State" docs hopefully should cover that quite well!


Hope this helps :)

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#using-managed-keyed-state
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/ops/state_backends.html#the-rocksdbstatebackend


On 8 August 2017 at 3:00:57 AM, Peter Ertl (peter.e...@gmx.net) wrote:

Hi folks,  

I am coding a streaming task that processes http requests from our web site and 
enriches these with additional information.  

It contains session ids from historic requests and the related emails that were 
used within these session in the past.  


lookup - hashtable: session_id: String => emails: Set[String]  


During processing of these NEW http request  

- the lookup table should be used to get previous emails and enrich the current 
stream item  
- new candidates for the lookup table will be discovered during processing of 
these items and should be added to the lookup table (also these changes should 
be visible through the cluster)  

I see at least the following issues:  

(1) load the state as a whole from the data store into memory is a huge burn of 
memory (also making changes cluster-wide visible is an issue)  

(2) not loading into memory but using something like cassandra / redis as a 
lookup store would certainly work but introduces a lot of network requests 
(possible ideas: use a distributed cache? broadcast updates in flink cluster?)  

(3) how should I integrate the changes to the table with flink's checkpointing? 
 

I really don't get how to solve this best and my current solution is far from 
elegant  

So is there any best practice for supporting "large lookup tables that change 
during stream processing" ?  

Cheers  
Peter  






Re: Flink streaming Parallelism

2017-08-07 Thread Tzu-Li (Gordon) Tai
Hi,

The equivalent would be setting a parallelism on your sink operator. e.g. 
stream.addSink(…).setParallelism(…).
By default the parallelism of all operators in the pipeline will be whatever 
parallelism was set for the whole job, unless parallelism is explicitly set for 
a specific operator. For more details on the distributed runtime concepts you 
can take a look at [1]

        I saw the implementation of elasticsearch sink in Flink which can do 
batching of messsges before writes. How can I batch data based on a custom 
logic? For eg: batch writes  grouped on one of the message keys.  This is 
possible in Storm via FieldGrouping.
The equivalent of partitioning streams in Flink is `stream.keyBy(…)`. All 
messages of the same key would then go to the same parallel downstream operator 
instance. If its an ElasticsearchSink, then following a keyBy all messages of 
the same key will be batched by the same ElasticSearch writer.

Hope this helps!

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/concepts/runtime.html


On 8 August 2017 at 8:58:30 AM, Jakes John (jakesjohn12...@gmail.com) wrote:

       I am coming from Apache Storm world.  I am planning to switch from storm 
to flink. I was reading Flink documentation but, I couldn't find some 
requirements in Flink which was present in Storm.  

I need to have a streaming pipeline  Kafka->flink-> ElasticSearch.  In storm,  
I have seen that I can specify number of tasks per bolt.  Typically databases 
are slow in writes and hence I need more writers to the database.  Reading from 
kafka is pretty fast when compared to ES writes.  This means that I need to 
have more ES writer tasks than Kafka consumers. How can I achieve it in Flink?  
What are the concepts in Flink similar to Storm Parallelism concepts like 
workers, executors, tasks?
        I saw the implementation of elasticsearch sink in Flink which can do 
batching of messsges before writes. How can I batch data based on a custom 
logic? For eg: batch writes  grouped on one of the message keys.  This is 
possible in Storm via FieldGrouping. But I couldn't find an equivalent way to 
do grouping in Flink and control the overall number of writes to ES.

Please help me with above questions and some pointers to flink parallelism. 





Re: KafkaConsumerBase

2017-08-02 Thread Tzu-Li (Gordon) Tai
Hi!

method shown in KafkaConsumerBase.java (version 1.2.0) 

A lot has changed in the FlinkKafkaConsumerBase since version 1.2.0.
And if I remember correctly, the `assignPartitions` method was actually a no 
longer relevant method used in the code, and was properly removed afterwards.
The method for partition assigning in 1.2.0 is called `assignTopicPartitions`, 
and is used in the open() method.

consumerCallBridge.assignPartitions(consumer, 
convertKafkaPartitions(subscribedPartitions)); 

i think here subscribedPartitions is all the partitions , not 
subtaskPartitions.
This code snippet is from `KafkaConsumerThread`, correct?

As stated above, the partitions are still filtered out to only be the 
partitions for each local subtask, using the `assignTopicPartitions` method. So 
here, the `subscribedPartitions` is not the complete list of partitions, only 
the partitions that the subtask should subscribe to.


Cheers,
Gordon

On 2 August 2017 at 9:52:03 PM, aitozi (gjying1...@gmail.com) wrote:


Hi,  

i have a question that , when we use KafkaConsumerBase, we will have to  
fetch data from different partition  
in different parllel thread like the method shown in  
KafkaConsumerBase.java (version 1.2.0)  

protected static List assignPartitions(  
List allPartitions,  
int numConsumers, int consumerIndex) {  
final List thisSubtaskPartitions = new ArrayList<>(  
allPartitions.size() / numConsumers + 1);  

for (int i = 0; i < allPartitions.size(); i++) {  
if (i % numConsumers == consumerIndex) {  
thisSubtaskPartitions.add(allPartitions.get(i));  
}  
}  

return thisSubtaskPartitions;  
}  

but i have not find any place invoke this method , in  
KafkaConsumerThread.java it used  

consumerCallBridge.assignPartitions(consumer,  
convertKafkaPartitions(subscribedPartitions));  

i think here subscribedPartitions is all the partitions , not  
subtaskPartitions. Can any one address my problem  



--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/KafkaConsumerBase-tp14636.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: multiple users per flink deployment

2017-08-02 Thread Tzu-Li (Gordon) Tai
Hi,

There’s been quite a few requests on this recently on the mailing lists and 
also mentioned by some users offline, so I think we may need to start with 
plans to probably support this.
I’m CC’ing Eron to this thread to see if he has any thoughts on this, as he was 
among the first authors driving the Kerberos support in Flink.
I’m not really sure if such a feature support makes sense, given that all jobs 
of a single Flink deployment have full privileges and therefore no isolation in 
between.

Related question: what external service are you trying to authenticate to with 
different users?
If it is Kafka and perhaps you have different users for the consumer / 
producer, that will be very soon available in 1.3.2, which includes a version 
bump to Kafka 0.10 that allows multiple independent users within the same JVM 
through dynamic JAAS configuration.
See this mail thread [1] for more detail on that.

Cheers,
Gordon

[1] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-0-10-jaas-multiple-clients-td12831.html#a13317

On 1 August 2017 at 6:16:08 PM, Georg Heiler (georg.kf.hei...@gmail.com) wrote:

Hi,

flink currently only seems to support a single kerberos ticket for deployment. 
Are there plans to support different users per each job?

regards,
Georg

Re: About KafkaConsumerBase

2017-08-01 Thread Tzu-Li (Gordon) Tai
Hi,

it maintain itself a individual instance of 
FlinkKafkaConsumerBase, and it contains a individual pendingOffsetsToCommit 
, am right ? 
That is correct! The FlinkKafkaConsumerBase is code executed for each parallel 
subtask instance, and therefore have their own pendingOffsetsToCommit which 
would not be manipulated / accessed concurrently.

The only places where that map is accessed is in the snapshotState and 
notifyCheckpointComplete method, which I think is guaranteed to not be 
concurrently called.

Cheers,
Gordon


On 2 August 2017 at 1:02:57 PM, aitozi (gjying1...@gmail.com) wrote:


Hi,Piotr Nowojski  

i think you are right, but i think it is executed in parallel, but in  
each parallel , it maintain itself a individual instance of  
FlinkKafkaConsumerBase, and it contains a individual pendingOffsetsToCommit  
, am right ?  

thanks, aitozi  



--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/About-KafkaConsumerBase-tp14601p14619.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: Fink: KafkaProducer Data Loss

2017-07-31 Thread Tzu-Li (Gordon) Tai
Hi!

Thanks a lot for providing this.
I'll try to find some time this week to look into this using your example code.

Cheers,
Gordon

On 29 July 2017 at 4:46:57 AM, ninad (nni...@gmail.com) wrote:

Hi Gordon, I was able to reproduce the data loss on standalone flink cluster 
also. I have stripped down version of our code with here: 

Environment: 
Flink standalone 1.3.0 
Kafka 0.9 

*What the code is doing:* 
-consume messages from kafka topic ('event.filter.topic' property in 
application.properties) 
-group them by key 
-analyze the events in a window and filter some messages. 
-send remaining messages to kafka topc ('sep.http.topic' property in 
application.properties) 

To build: 
./gradlew clean assemble 

The jar needs path to 'application.properties' file to run 

Important properties in application.properties: 
window.session.interval.sec 
kafka.brokers 
event.filter.topic --> source topic 
sep.http.topic --> destination topic 

To test: 
-Use 'EventGenerator' class to publish messages to source kafka topic 
The data published won't be filtered by the logic. If you publish 10 
messages to the source topic, 
those 10 messages should be sent to the destination topic. 

-Once we see that flink has received all the messages, bring down all kafka 
brokers 

-Let Flink jobs fail for 2-3 times. 

-Restart kafka brokers. 

Note: Data loss isn't observed frequently. 1/4 times or so. 

Thanks for all your help. 

eventFilter.zip 

 









-- 
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p14522.html
 
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com. 


Re: write into hdfs using avro

2017-07-27 Thread Tzu-Li (Gordon) Tai
Hi!

Yes, you can provide a custom writer for the BucketingSink via 
BucketingSink#setWriter(…).
The AvroKeyValueSinkWriter is a simple example of a writer that uses Avro for 
serialization, and takes as input KV 2-tuples.
If you want to have a writer that takes as input your own event types, AFAIK 
you’ll need to implement your own Writer.

Cheers,
Gordon

On 21 July 2017 at 7:31:21 PM, Rinat (r.shari...@cleverdata.ru) wrote:

Hi, folks !

I’ve got a little question, I’m trying to save stream of events from Kafka into 
HDSF using org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink 
with AVRO serialization. 
If I properly understood, I should use some implementation of 
org.apache.flink.streaming.connectors.fs.Writer for this purposes.

I found an existing implementation of avro writer 
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter, but my 
stream contains only value. 
What I need to do, if I want to write values from stream using a BucketingSing 
in avro format ?

Thx.

Re: FlinkKafkaConsumer010 - Memory Issue

2017-07-25 Thread Tzu-Li (Gordon) Tai
Hi,

I couldn’t seem to reproduce this.

Taking another look at your description, one thing I spotted was that your 
Kafka broker installation versions are 0.10.1.0, while the Kafka consumer uses 
Kafka clients of version 0.10.0.1 (by default, as shown in your logs).
I’m wondering whether or not that could be the root cause here.

Cheers,
Gordon

On 26 July 2017 at 12:14:02 AM, PedroMrChaves (pedro.mr.cha...@gmail.com) wrote:

My Flink version: 1.2.1 
Kafka consumer: 010 
Kafka version: 2_11_0.10.1.0-2

Re: FlinkKafkaConsumer subscribes to partitions in restoredState only.

2017-07-24 Thread Tzu-Li (Gordon) Tai
One other note:
the functionality is actually already merged to the master branch.
You can also take a look at the feature documentation here [1].

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-partition-discovery

On 25 July 2017 at 1:22:10 PM, Tzu-Li (Gordon) Tai (tzuli...@apache.org) wrote:

Hi,

Sorry for not replying to this earlier, it seems like this thread hadn’t been 
noticed earlier.

What you are experiencing is expected behavior. In Flink 1.3, new partitions 
will not be picked up, only partitions that are in checkpoints state will be 
subscribed to on restore runs.
One main reason for this is because partition discovery on restore runs does 
not work well with repartitionable list operator state.

This limitation will be removed in Flink 1.4, where the Flink Kafka Consumer 
will have the capability to discover partitions (even during processing without 
the need to restart the job).
Unfortunately, this is not available in Flink 1.3.

Cheers,
Gordon

On 13 July 2017 at 4:37:38 AM, ninad (nni...@gmail.com) wrote:

Hello,
We're noticing that FlinkKafkaConsumer subscribes to partitions in restored
state only. Thus, partitions which aren't in restored state aren't read. We
have to restart the job, for FlinkKafkaConsumer to read from all partitions.

Here are the details:

Environment:
Flink-1.3.0, standalone cluster as well as hadoop-cloudera cluster
flink-connector-kafka-0.9_2.11:1.3.0

-Start a job which reads from kafka topic.
-Bring down all kafka brokers
-Bring up kafka brokers

At this point, we see this in the logs:

*2017-07-12 19:53:23,661 INFO
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase -
Consumer subtask 0 will start reading 8 partitions with offsets in restored
state: {KafkaTopicPartition{topic='topic.event.filter', partition=0}=3,
KafkaTopicPartition{topic='topic.event.filter', partition=2}=18,
KafkaTopicPartition{topic='topic.event.filter', partition=8}=1,
KafkaTopicPartition{topic='topic.event.filter', partition=9}=-1,
KafkaTopicPartition{topic='topic.event.filter', partition=3}=17,
KafkaTopicPartition{topic='topic.event.filter', partition=4}=17,
KafkaTopicPartition{topic='topic.event.filter', partition=5}=17,
KafkaTopicPartition{topic='topic.event.filter', partition=6}=1}*

Flink subscribes to only 8 partitions, because they are in recovery.
Remaining partitions aren't subscribed to.

From the code, I don't see a place where, the partitions in non-restored
state are being subscribed to.

Relevant code:

*if (restoredState != null) {
for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) {
if (restoredState.containsKey(kafkaTopicPartition)) {
subscribedPartitionsToStartOffsets.put(kafkaTopicPartition,
restoredState.get(kafkaTopicPartition));
}
}

LOG.info("Consumer subtask {} will start reading {} partitions with
offsets in restored state: {}",
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(),
subscribedPartitionsToStartOffsets);
}*

We're not setting 'setStartFromEarliest' or 'setStartFromLatest', so it's
using the default: 'setStartFromGroupOffsets'.

Are we missing any setting? Doing something wrong? Please let us know.
Thanks !






--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaConsumer-subscribes-to-partitions-in-restoredState-only-tp14233.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: FlinkKafkaConsumer subscribes to partitions in restoredState only.

2017-07-24 Thread Tzu-Li (Gordon) Tai
Hi,

Sorry for not replying to this earlier, it seems like this thread hadn’t been 
noticed earlier.

What you are experiencing is expected behavior. In Flink 1.3, new partitions 
will not be picked up, only partitions that are in checkpoints state will be 
subscribed to on restore runs.
One main reason for this is because partition discovery on restore runs does 
not work well with repartitionable list operator state.

This limitation will be removed in Flink 1.4, where the Flink Kafka Consumer 
will have the capability to discover partitions (even during processing without 
the need to restart the job).
Unfortunately, this is not available in Flink 1.3.

Cheers,
Gordon

On 13 July 2017 at 4:37:38 AM, ninad (nni...@gmail.com) wrote:

Hello,  
We're noticing that FlinkKafkaConsumer subscribes to partitions in restored  
state only. Thus, partitions which aren't in restored state aren't read. We  
have to restart the job, for FlinkKafkaConsumer to read from all partitions.  

Here are the details:  

Environment:  
Flink-1.3.0, standalone cluster as well as hadoop-cloudera cluster  
flink-connector-kafka-0.9_2.11:1.3.0  

-Start a job which reads from kafka topic.  
-Bring down all kafka brokers  
-Bring up kafka brokers  

At this point, we see this in the logs:  

*2017-07-12 19:53:23,661 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase -  
Consumer subtask 0 will start reading 8 partitions with offsets in restored  
state: {KafkaTopicPartition{topic='topic.event.filter', partition=0}=3,  
KafkaTopicPartition{topic='topic.event.filter', partition=2}=18,  
KafkaTopicPartition{topic='topic.event.filter', partition=8}=1,  
KafkaTopicPartition{topic='topic.event.filter', partition=9}=-1,  
KafkaTopicPartition{topic='topic.event.filter', partition=3}=17,  
KafkaTopicPartition{topic='topic.event.filter', partition=4}=17,  
KafkaTopicPartition{topic='topic.event.filter', partition=5}=17,  
KafkaTopicPartition{topic='topic.event.filter', partition=6}=1}*  

Flink subscribes to only 8 partitions, because they are in recovery.  
Remaining partitions aren't subscribed to.  

From the code, I don't see a place where, the partitions in non-restored  
state are being subscribed to.  

Relevant code:  

*if (restoredState != null) {  
for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) {  
if (restoredState.containsKey(kafkaTopicPartition)) {  
subscribedPartitionsToStartOffsets.put(kafkaTopicPartition,  
restoredState.get(kafkaTopicPartition));  
}  
}  

LOG.info("Consumer subtask {} will start reading {} partitions with  
offsets in restored state: {}",  
getRuntimeContext().getIndexOfThisSubtask(),  
subscribedPartitionsToStartOffsets.size(),  
subscribedPartitionsToStartOffsets);  
}*  

We're not setting 'setStartFromEarliest' or 'setStartFromLatest', so it's  
using the default: 'setStartFromGroupOffsets'.  

Are we missing any setting? Doing something wrong? Please let us know.  
Thanks !  






--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaConsumer-subscribes-to-partitions-in-restoredState-only-tp14233.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: cannot use ElasticsearchSink in Flink1.3.0

2017-07-20 Thread Tzu-Li (Gordon) Tai
Hi,

There was an issue with release ES 5 in 1.3.0, and the artifacts were not 
released to Maven central.
Please use 1.3.1 instead.

Cheers,
Gordon


On 20 July 2017 at 3:31:39 PM, ZalaCheung (gzzhangdesh...@corp.netease.com) 
wrote:

Hi all,

I am using Flink 1.3.0 and following instructions here to add elasticsearch as 
a sink.

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/elasticsearch.html

I follow the instruction to add the dependency like this:


   org.apache.flink
   flink-connector-elasticsearch5_2.10
   ${flink.version}
the flink version is 1.3.0.

When I try to write code to add es as sink, IntelliJ cannot resolve symbol for 
“ElasticsearchSink”.

result.addSink(new ElasticsearchSink<>(config, transportAddresses, new 
ElasticsearchSinkFunction(){
public IndexRequest createIndexRequest(String element){

}

@Override
public void process(String element, RuntimeContext ctx, RequestIndexer 
indexer) {

}
}));


The elastic package doesn’t contain anything called “ElasticsearchSink”, only 
has “ElasticserachSinkFunciton”

import org.apache.flink.streaming.connectors.elasticsearch.

Thanks,
Desheng Zhang
E-mail: gzzhangdesh...@corp.netease.com;



Re:Re: Problem with Flink restoring from checkpoints

2017-07-20 Thread Tzu-Li (Gordon) Tai
Hi,

What Fabian mentioned is true. Flink Kafka Consumer’s exactly-once guarantee 
relies on offsets checkpoints as Flink state, and doesn’t rely on the committed 
offsets in Kafka.

What we found is that Flink acks Kafka immediately before even writing to S3.

What you mean by ack here is the offset committing back to Kafka, correct?
First of all, as mentioned, this behavior is not related to exactly-once. In 
fact, in Flink 1.3, you can completely turn this off and still achieve 
exactly-once (with Flink checkpointing enabled).
One other thing to mention is that offsets are committed to Kafka when all 
sinks in the job complete their state snapshot.

What we found is that Flink acks Kafka immediately before even writing to S3. 
The consequence of this seems to be that if the job gets cancelled before the 
acked events are flushed to S3 then these are lost

So, at a first look on your description, this seems like a problem with the 
state snapshotting of the bucketing sink. This is suggesting that data is not 
flushed to S3 properly when `snapshotState` of the bucketing sink returns. I’m 
not entirely familiar with the bucketing sink, so this is just a superficial 
guess from what you described.

Flink doesn't seem to keep in its checkpointed state the fact that it acked 
those events but never flushed them to S3.

Keep in mind that this is two separate states we’re talking about here. 1) the 
offsets checkpointed as state of the Kafka consumer source, and 2) bucket state 
(which should keep track of uncommitted events w.r.t. Flink’s checkpoints; 
events are considered as committed in bucketed sinks when the Flink checkpoint 
it is part of is complete). For details on this I recommend checking out the 
class Javadoc of `BucketingSink`.

@Sihua
the bucketing sink also manages bucket states to achieve exactly-once semantics.

Cheers,
Gordon

On 20 July 2017 at 10:46:52 AM, 周思华 (summerle...@163.com) wrote:

Hi Fran,

is the DataTimeBucketer acts like a memory buffer and does't managed by flink's 
state? If so, then i think the problem is not about Kafka, but about the 
DateTimeBucketer. Flink won't take snapshot for the DataTimeBucketer if it not 
in any state.

Best, 
Sihua Zhou


At 2017-07-20 03:02:20, "Fabian Hueske"  wrote:
Hi Fran,

did you observe actual data loss due to the problem you are describing or are 
you discussing a possible issue based on your observations?

AFAIK, Flink's Kafka consumer keeps track of the offsets itself and includes 
these in the checkpoints. In case of a recovery, it does not rely on the 
offsets which were committed back to Kafka but only on the offsets it 
checkpointed itself.
Gordon (in CC) is familiar with all details of Flink's Kafka consumer and can 
give a more detailed answer.

Best, Fabian

2017-07-19 16:55 GMT+02:00 Francisco Blaya :
Hi,

We have a Flink job running on AWS EMR sourcing a Kafka topic and persisting 
the events to S3 through a DateTimeBucketer. We configured the bucketer to 
flush to S3 with an inactivity period of 5 mins.The rate at which events are 
written to Kafka in the first place is very low so it is easy for us to 
investigate how the Flink job would recover in respect to Kafka offsets after 
the job gets cancelled or the Yarn session killed.

What we found is that Flink acks Kafka immediately before even writing to S3. 
The consequence of this seems to be that if the job gets cancelled before the 
acked events are flushed to S3 then these are lost, they don't get written when 
the job restarts. Flink doesn't seem to keep in its checkpointed state the fact 
that it acked those events but never flushed them to S3. Checkpoints are 
created every 5 seconds in S3.

We've also tried to configure externalized checkpoints throught 
"state.checkpoints.dir" configuration key and 
"env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)"
 in the job so that they don't automatically get cleaned up when the job gets 
cancelled or the Yarn session killed. We can see the job uses a restored 
checkpoint upon restart but still we get missing events in S3.

Has anyone come across this behaviour before? Are we assuming something wrong?

We're using EMR 5.4.0 and Flink 1.2.0.

Regards,
Fran

hivehome.com




Hive | London | Cambridge | Houston | Toronto
The information contained in or attached to this email is confidential and 
intended only for the use of the individual(s) to which it is addressed. It may 
contain information which is confidential and/or covered by legal professional 
or other privilege. The views expressed in this email are not necessarily the 
views of Centrica plc, and the company, its directors, officers or employees 
make no representation or accept any liability for their accuracy or 
completeness unless expressly stated to the contrary. 
Centrica Connected Home Limited (company no: 5782908), registered in England 
and Wales with its 

Re: Kafka Producer - Null Pointer Exception when processing by element

2017-07-20 Thread Tzu-Li (Gordon) Tai
Our parser.parse() function has a one-to-one mapping between an input byte[] 
to a List
Ideally, this should be handled within the KeyedDeserializationSchema passed to 
your Kafka consumer. That would then avoid the need of an extra “parser map 
function” after the source.

Were you suggesting a flatMap instead of map at this stage of 
calling our parser, or did you mean to use a flatMap() after the parser and 
before the split()? 
I meant a flatMap after the parser (whether it’s done as a map function or 
within the Kafka source) and before the split. The flatMap function iterates 
through your per-record lists and collects as it iterates through them.

- Gordon




On 18 July 2017 at 3:02:45 AM, earellano (eric.arell...@ge.com) wrote:

Tzu-Li (Gordon) Tai wrote  
> Basically, when two operators are chained together, the output of the  
> first operator is immediately chained to the processElement of the next  
> operator; it’s therefore just a consecutive invocation of processElements  
> on the chained operators. There will be no thread-to-thread handover or  
> buffering.  

Okay great, chaining tasks does sound like what we want then.  



Tzu-Li (Gordon) Tai wrote  
> In that case, I would suggest using flatMap here, followed by chained  
> splits and then sinks.  

We changed our code to roughly follow this suggestion, but I'm not sure  
we're doing this correctly? Is there a better way you recommend chaining the  
tasks? As written below, are individual Events within the List being sent to  
their respective sinks right away, or does the whole list have to split  
first?  
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n14312/split-stream.png>
  

We also had issues getting flatMap to work, and map seemed more appropriate.  
Our parser.parse() function has a one-to-one mapping between an input byte[]  
to a List, and that never changes, so a map seems to make  
sense to us. Were you suggesting a flatMap instead of map at this stage of  
calling our parser, or did you mean to use a flatMap() after the parser and  
before the split()?  



--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Producer-Null-Pointer-Exception-when-processing-by-element-tp14288p14312.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: Flink Elasticsearch Connector: Lucene Error message

2017-07-20 Thread Tzu-Li (Gordon) Tai
Glad to hear it’s working!

Yes, normally you should avoid using the lib folder to resolve these dependency 
issues and rely only on user jar packaging when working with Flink connectors.

- Gordon


On 17 July 2017 at 9:44:20 PM, Fabian Wollert (fabian.woll...@zalando.de) wrote:

TL;DR: remove all lucene and elasticsearch libs in your flink env and just use 
maven to manage dependencies, when working with the flink elasticsearch 
connector.

so in the first place i deleted the libs in the folder to see if its working, 
but it did not. then we thought if maybe flink loads already the libs at 
startup, so i packaged our flink appliance again, with out the old lucene lib 
which was still loaded, and then redeployed, and et voilà, it worked then!

thanks guys for the investigation help!

Cheers


--
Fabian Wollert
Zalando SE

E-Mail: fabian.woll...@zalando.de
Location: ZMAP

2017-07-17 9:58 GMT+02:00 Tzu-Li (Gordon) Tai <tzuli...@apache.org>:
Hi,

I would also recommend checking the `lib/` folder of your Flink installation to 
see if there is any dangling old version jars that you added there.
I did a quick dependency check on the Elasticsearch 2 connector, it is 
correctly pulling in Lucene 5.5.0 only, so this dependency should not pop up 
given that the user code is packaged properly.
As of now, I would guess that it is some dependency conflict caused by either 
the reasons mentioned above, or some other dependency in the user jar is 
pulling in a conflicting Lucene version.

Of course, if you doubt otherwise and that isn’t the case, let us know the 
result of your checks so we can investigate further! Thanks.

Cheers,
Gordon

On 17 July 2017 at 3:38:17 PM, Fabian Wollert (fabian.woll...@zalando.de) wrote:

1.3.0, but i only need the ES 2.X connector working right now, since that's the 
elasticsearch version we're using. another option would be to upgrade to ES 5 
(at elast on dev) to see if its working as well, but that sounds not like 
fixing the problem for me :-D

Cheers
Fabian


--
Fabian Wollert
Zalando SE

E-Mail: fabian.woll...@zalando.de
Location: ZMAP

2017-07-16 15:47 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>:
Hi,

There was also a problem in releasing the ES 5 connector with Flink 1.3.0. You 
only said you’re using Flink 1.3, would that be 1.3.0 or 1.3.1?

Best,
Aljoscha

On 16. Jul 2017, at 13:42, Fabian Wollert <fabian.woll...@zalando.de> wrote:

Hi Aljoscha,

we are running Flink in Stand alone mode, inside Docker in AWS. I will check 
tomorrow the dependencies, although i'm wondering: I'm running Flink 1.3 
averywhere and the appropiate ES connector which was only released with 1.3, so 
it's weird where this dependency mix up comes from ... let's see ...

Cheers
Fabian


--
Fabian Wollert
Zalando SE

E-Mail: fabian.woll...@zalando.de
Location: ZMAP

2017-07-14 11:15 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>:
This kind of error almost always hints at a dependency clash, i.e. there is 
some version of this code in the class path that clashed with the version that 
the Flink program uses. That’s why it works in local mode, where there are 
probably not many other dependencies and not in cluster mode.

How are you running it on the cluster? Standalone, YARN?

Best,
Aljoscha

On 13. Jul 2017, at 13:56, Fabian Wollert <fabian.woll...@zalando.de> wrote:

Hi Timo, Hi Gordon,

thx for the reply! I checked the connection from both clusters to each other, 
and i can telnet to the 9300 port of flink, so i think the connection is not an 
issue here. 

We are currently using in our live env a custom elasticsearch connector, which 
used some extra lib's deployed on the cluster. i found one lucene lib and 
deleted it (since all dependencies should be in the flink job jar), but that 
unfortunately did not help neither ...

Cheers
Fabian


--
Fabian Wollert
Data Engineering
Technology

E-Mail: fabian.woll...@zalando.de
Location: ZMAP

2017-07-13 13:46 GMT+02:00 Timo Walther <twal...@apache.org>:
Hi Fabian,

I loop in Gordon. Maybe he knows whats happening here.

Regards,
Timo


Am 13.07.17 um 13:26 schrieb Fabian Wollert:
Hi everyone,

I'm trying to make use of the new Elasticsearch Connector. I got a version 
running locally (with ssh tunnels to my Elasticsearch cluster in AWS) in my 
IDE, I see the data in Elasticsearch written perfectly, as I want it. As soon 
as I try to run this on our dev cluster (Flink 1.3.0, running in the same VPC 
like ) though, i get the following error message (in the sink):

java.lang.NoSuchFieldError: LUCENE_5_5_0
at org.elasticsearch.Version.(Version.java:295)
at 
org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:129)
at 
org.apache.flink.streaming.connectors.elasticsearch2.Elasticsearch2ApiCallBridge.createClient(Elasticsearch2ApiCallBridge.java:65)
at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase

Re: FlinkKafkaConsumer010 - Memory Issue

2017-07-19 Thread Tzu-Li (Gordon) Tai
Hi Pedro,

Seems like a memory leak. The only issue I’m currently aware of that may be 
related is [1]. Could you tell if this JIRA relates to what you are bumping 
into?
The JIRA mentions Kafka 09, but a fix is only available for Kafka 010 once we 
bump our Kafka 010 dependency to the latest version.

If that doesn’t relate to you, please let me know and I'll investigate a bit 
more.

Cheers,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-6301


On 20 July 2017 at 3:14:57 AM, Fabian Hueske (fhue...@gmail.com) wrote:

Hi,

Gordon (in CC) knows the details of Flink's Kafka consumer.
He might know how to solve this issue.

Best, Fabian

2017-07-19 20:23 GMT+02:00 PedroMrChaves :
Hello,

Whenever I submit a job to Flink that retrieves data from Kafka the memory
consumption continuously increases. I've changed the max heap memory from
2gb, to 4gb, even to 6gb but the memory consumption keeps reaching the
limit.

An example of a simple Job that shows this behavior is depicted bellow.

/          /*
             * Execution Environment Setup
             */
            final StreamExecutionEnvironment environment =
getGlobalJobConfiguration(configDir, configurations);

            /**
             * Collect event data from Kafka
             */
            DataStreamSource s = environment.addSource(new
FlinkKafkaConsumer010(
                    configurations.get(ConfigKeys.KAFKA_INPUT_TOPIC),
                    new SimpleStringSchema(),
                    getKafkaConfiguration(configurations)));

            s.filter(new FilterFunction() {
                @Override
                public boolean filter(String value) throws Exception {
                    return false;
                }
            }).print();

private static Properties getKafkaConfiguration(ParameterTool
configurations) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers",
configurations.get(ConfigKeys.KAFKA_HOSTS));
        properties.put("group.id",
"flink-consumer-"+UUID.randomUUID().toString());
        properties.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("security.protocol",
configurations.get(ConfigKeys.KAFKA_SECURITY_PROTOCOL));
        properties.put("ssl.truststore.location",
configurations.get(ConfigKeys.KAFKA_SSL_TRUSTSTORE_LOCATION));
        properties.put("ssl.truststore.password",
configurations.get(ConfigKeys.KAFKA_SSL_TRUSTSTORE_PASSWORD));
        properties.put("ssl.keystore.location",
configurations.get(ConfigKeys.KAFKA_SSL_KEYSTORE_LOCATION));
        properties.put("ssl.keystore.password",
configurations.get(ConfigKeys.KAFKA_SSL_KEYSTORE_PASSWORD));
        return properties;
    }
/


Moreover, when I stop the job, the task manager does not terminate the kafka
connection and the memory is kept allocated. To stop this, I have to kill
the task manager process.

*My Flink version: 1.2.1
Kafka consumer: 010
Kafka version: 2_11_0.10.1.0-2*

I've activated the /taskmanager.debug.memory.startLogThread/ property to
output for every 5 seconds and attached the log with the results.

The output of free -m before submitting the job:
/              total        used        free      shared  buff/cache
available
Mem:          15817         245       14755          24         816
15121
Swap:             0           0           0/

after having the job running for about 5 min:
 free -m
/              total        used        free      shared  buff/cache
available
Mem:          15817        9819        5150          24         847
5547
Swap:             0           0           0
/

taskmanager.log






-
Best Regards,
Pedro Chaves
--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaConsumer010-Memory-Issue-tp14342.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.



Re: Does FlinkKafkaConsumer010 care about consumer group?

2017-07-19 Thread Tzu-Li (Gordon) Tai
Does this mean I can use the same consumer group G1 for the newer version A'? 
And inspite of same consumer group, A' will receive messages from all 
partitions when its started from savepoint?

Yes. That’s true. Flink internally uses static partition assignment, and the 
clients are assigned whatever partition states they are restored with.
The only “conflict” this would introduce is that both jobs will be competing 
offset committing to the same consumer group in Kafka (again, this doesn’t 
affect exactly-once but might mess up other external monitoring tools you may 
be using).

I am using Flink 1.2.1. Does the above plan require setting uid on the Kafka 
source in the job?
That would be a separate topic. Setting the UID of operators explicitly is 
usually always recommended before moving to production. See [1].

If your job code hasn’t changed across the restores, then it should be fine 
even if you didn’t set the UID.



[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/ops/production_ready.html


On 19 July 2017 at 3:41:28 PM, Moiz S Jinia (moiz.ji...@gmail.com) wrote:

Does this mean I can use the same consumer group G1 for the newer version A'? 
And inspite of same consumer group, A' will receive messages from all 
partitions when its started from savepoint?

I am using Flink 1.2.1. Does the above plan require setting uid on the Kafka 
source in the job?

Thanks,
Moiz

On Wed, Jul 19, 2017 at 1:06 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> 
wrote:
Hi!

The only occasions which the consumer group is used is:
1. When committing offsets back to Kafka. Since Flink 1.3, this can be disabled 
completely (both when checkpointing is enabled or disabled). See [1] on details 
about that.
2. When starting fresh (not starting from some savepoint), if you choose to use 
GROUP_OFFSETS as the start position, then the consumer group would also be 
used. If starting from a savepoint, then this is irrelevant. See [2].

Note that it actually isn’t used in any critical paths for Flink’s exactly-once 
processing guarantees, or partition to source parallel instance assignments.

So, the desired behavior in which you described is exactly the expected 
behavior for the Flink Kafka Consumer.

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-start-position-configuration

On 19 July 2017 at 3:23:01 PM, Moiz Jinia (moiz.ji...@gmail.com) wrote:

Below is a plan for downtime-free upgrade of a Flink job. The downstream
consumer of the Flink job is duplicate proof.

Scenario 1 -
1. Start Flink job A with consumer group G1 (12 slot job)
2. While job A is running, take a savepoint AS.
3. Start newer version of Flink job A' from savepoint AS with consumer group
*G1* (12 slot job again)
4. Stop job A.

Scenario 2 -
1. Start Flink job A with consumer group G1 (12 slot job)
2. While job A is running, take a savepoint AS.
3. Start newer version of Flink job A' from savepoint AS with consumer group
*G2* (12 slot job again)
4. Stop job A

Does it matter what consumer group job A' uses? The desired behavior is that
during the window when both A and A' are running, all messages should go to
both jobs. (And of course I want that job A' should start consuming from the
offsets in the savepoint and not the earliest).






--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Does-FlinkKafkaConsumer010-care-about-consumer-group-tp14323.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.



Re: Does FlinkKafkaConsumer010 care about consumer group?

2017-07-19 Thread Tzu-Li (Gordon) Tai
Hi!

The only occasions which the consumer group is used is:
1. When committing offsets back to Kafka. Since Flink 1.3, this can be disabled 
completely (both when checkpointing is enabled or disabled). See [1] on details 
about that.
2. When starting fresh (not starting from some savepoint), if you choose to use 
GROUP_OFFSETS as the start position, then the consumer group would also be 
used. If starting from a savepoint, then this is irrelevant. See [2].

Note that it actually isn’t used in any critical paths for Flink’s exactly-once 
processing guarantees, or partition to source parallel instance assignments.

So, the desired behavior in which you described is exactly the expected 
behavior for the Flink Kafka Consumer.

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-start-position-configuration

On 19 July 2017 at 3:23:01 PM, Moiz Jinia (moiz.ji...@gmail.com) wrote:

Below is a plan for downtime-free upgrade of a Flink job. The downstream  
consumer of the Flink job is duplicate proof.  

Scenario 1 -  
1. Start Flink job A with consumer group G1 (12 slot job)  
2. While job A is running, take a savepoint AS.  
3. Start newer version of Flink job A' from savepoint AS with consumer group  
*G1* (12 slot job again)  
4. Stop job A.  

Scenario 2 -  
1. Start Flink job A with consumer group G1 (12 slot job)  
2. While job A is running, take a savepoint AS.  
3. Start newer version of Flink job A' from savepoint AS with consumer group  
*G2* (12 slot job again)  
4. Stop job A  

Does it matter what consumer group job A' uses? The desired behavior is that  
during the window when both A and A' are running, all messages should go to  
both jobs. (And of course I want that job A' should start consuming from the  
offsets in the savepoint and not the earliest).  






--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Does-FlinkKafkaConsumer010-care-about-consumer-group-tp14323.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: Flink Elasticsearch Connector: Lucene Error message

2017-07-17 Thread Tzu-Li (Gordon) Tai
Hi,

I would also recommend checking the `lib/` folder of your Flink installation to 
see if there is any dangling old version jars that you added there.
I did a quick dependency check on the Elasticsearch 2 connector, it is 
correctly pulling in Lucene 5.5.0 only, so this dependency should not pop up 
given that the user code is packaged properly.
As of now, I would guess that it is some dependency conflict caused by either 
the reasons mentioned above, or some other dependency in the user jar is 
pulling in a conflicting Lucene version.

Of course, if you doubt otherwise and that isn’t the case, let us know the 
result of your checks so we can investigate further! Thanks.

Cheers,
Gordon

On 17 July 2017 at 3:38:17 PM, Fabian Wollert (fabian.woll...@zalando.de) wrote:

1.3.0, but i only need the ES 2.X connector working right now, since that's the 
elasticsearch version we're using. another option would be to upgrade to ES 5 
(at elast on dev) to see if its working as well, but that sounds not like 
fixing the problem for me :-D

Cheers
Fabian


--
Fabian Wollert
Zalando SE

E-Mail: fabian.woll...@zalando.de
Location: ZMAP

2017-07-16 15:47 GMT+02:00 Aljoscha Krettek :
Hi,

There was also a problem in releasing the ES 5 connector with Flink 1.3.0. You 
only said you’re using Flink 1.3, would that be 1.3.0 or 1.3.1?

Best,
Aljoscha

On 16. Jul 2017, at 13:42, Fabian Wollert  wrote:

Hi Aljoscha,

we are running Flink in Stand alone mode, inside Docker in AWS. I will check 
tomorrow the dependencies, although i'm wondering: I'm running Flink 1.3 
averywhere and the appropiate ES connector which was only released with 1.3, so 
it's weird where this dependency mix up comes from ... let's see ...

Cheers
Fabian


--
Fabian Wollert
Zalando SE

E-Mail: fabian.woll...@zalando.de
Location: ZMAP

2017-07-14 11:15 GMT+02:00 Aljoscha Krettek :
This kind of error almost always hints at a dependency clash, i.e. there is 
some version of this code in the class path that clashed with the version that 
the Flink program uses. That’s why it works in local mode, where there are 
probably not many other dependencies and not in cluster mode.

How are you running it on the cluster? Standalone, YARN?

Best,
Aljoscha

On 13. Jul 2017, at 13:56, Fabian Wollert  wrote:

Hi Timo, Hi Gordon,

thx for the reply! I checked the connection from both clusters to each other, 
and i can telnet to the 9300 port of flink, so i think the connection is not an 
issue here. 

We are currently using in our live env a custom elasticsearch connector, which 
used some extra lib's deployed on the cluster. i found one lucene lib and 
deleted it (since all dependencies should be in the flink job jar), but that 
unfortunately did not help neither ...

Cheers
Fabian


--
Fabian Wollert
Data Engineering
Technology

E-Mail: fabian.woll...@zalando.de
Location: ZMAP

2017-07-13 13:46 GMT+02:00 Timo Walther :
Hi Fabian,

I loop in Gordon. Maybe he knows whats happening here.

Regards,
Timo


Am 13.07.17 um 13:26 schrieb Fabian Wollert:
Hi everyone,

I'm trying to make use of the new Elasticsearch Connector. I got a version 
running locally (with ssh tunnels to my Elasticsearch cluster in AWS) in my 
IDE, I see the data in Elasticsearch written perfectly, as I want it. As soon 
as I try to run this on our dev cluster (Flink 1.3.0, running in the same VPC 
like ) though, i get the following error message (in the sink):

java.lang.NoSuchFieldError: LUCENE_5_5_0
at org.elasticsearch.Version.(Version.java:295)
at 
org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:129)
at 
org.apache.flink.streaming.connectors.elasticsearch2.Elasticsearch2ApiCallBridge.createClient(Elasticsearch2ApiCallBridge.java:65)
at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:272)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:375)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)

I first thought that this has something to do with mismatched versions, but it 
happens to me with Elasticsearch 2.2.2 (bundled with Lucene 5.4.1) and 
Elasticsearch 2.3 (bundled with Lucene 5.5.0). 

Can someone point to what exact version conflict is happening here (or where to 
investigate further)? Currently my set up looks like everything is actually 
running with Lucene 5.5.0, so I'm wondering where that error message is exactly 
coming from. And also why it is running locally, but not in the cluster. I'm 
still 

Re: Kafka Producer - Null Pointer Exception when processing by element

2017-07-17 Thread Tzu-Li (Gordon) Tai
With task chaining as you're saying, could you help clarify how it works 
please?
Operator can be chained to be executed by a single task thread. See [1] for 
more details on that.

Basically, when two operators are chained together, the output of the first 
operator is immediately chained to the processElement of the next operator; 
it’s therefore just a consecutive invocation of processElements on the chained 
operators. There will be no thread-to-thread handover or buffering.

For example, a 
byte[] record can return from our parser a List of 10 SuccessEvents and 1 
ErrorEvent; we want to publish each Event immediately.
In that case, I would suggest using flatMap here, followed by chained splits 
and then sinks.

Using flatMap, you can collect elements as you iterate through the list element 
(i.e. `collector.collect(...)`). If the sinks are properly chained (which 
should be the case if there is no keyBy before the sink and you haven’t 
explicitly configured otherwise [2]), then for each .collect(...) the sink 
write will be invoked as part of the chain.

Effectively, this would then be writing to Kafka / Cassandra for every element 
as you iterate through that list (happening in the same thread since everything 
is chained), and matches what you have in mind.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/concepts/runtime.html#tasks-and-operator-chains
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#task-chaining-and-resource-groups

On 17 July 2017 at 2:06:52 PM, earellano (eric.arell...@ge.com) wrote:

Hi,  

Tzu-Li (Gordon) Tai wrote  
> These seems odd. Are your events intended to be a list? If not, this  
> should be a `DataStream  
>   
> `.  
>  
> From the code snippet you’ve attached in the first post, it seems like  
> you’ve initialized your source incorrectly.  
>  
> `env.fromElements(List<...>)` will take the whole list as a single event,  
> thus your source is only emitting a single list as a record.  

Ah sorry for the confusion. So the original code snippet isn't our actual  
code - it's a simplified and generified version so that it would be easy to  
reproduce the Null Pointer Exception without having to show our whole code  
base.  

To clarify, our input actually uses a Kafka Consumer that reads a byte[],  
which is then passed to our external library parser which takes a byte[] and  
converts it into a List. This is why we have to use  
DataStream<ListEvents>>, rather than just DataStream. It's a  
requirement from the parser we have to use, because each byte[] array record  
can create both a SuccessEvent(s) and/or ErrorEvent(s).  

Our motivation for using the above map & for loop with conditional output  
logic was that we have to work with this whole List and not just  
individual Events, but don't want to wait for the whole list to be processed  
for the event at the beginning of the list to be outputted. For example, a  
byte[] record can return from our parser a List of 10 SuccessEvents and 1  
ErrorEvent; we want to publish each Event immediately. Low latency is  
extremely important to us.  

--  

With task chaining as you're saying, could you help clarify how it works  
please? With each record of type List and calling the Split Operator  
followed by the sink operators, does that whole record/list have to be split  
before it can then go on to the sink? Or does task chaining mean it  
immediately gets outputted to the sink?  


Thanks so much for all this help by the way!  




--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Producer-Null-Pointer-Exception-when-processing-by-element-tp14288p14300.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: how to get topic names in SinkFunction when using FlinkKafkaConsumer010 with multiple topics

2017-07-16 Thread Tzu-Li (Gordon) Tai
Hi,

Here’s an example:

DataStream inputStream = …;

inputStream.addSink(new FlinkKafkaProducer09<>(
    “defaultTopic”, new CustomKeyedSerializationSchema(), props));

Code for CustomKeyedSerializationSchema:
public class CustomKeyedSerializationSchema implements 
KeyedDeserializationSchema {
    byte[] getKeyBytes(…) {…}
    byte[] getValueBytes(…) {…}
    String getTargetTopic() {
      ...
    }
}

For the above code, by default records will always be sent to the 
“defaultTopic” topic.
However, for each record, `getTargetTopic` will also be called. Whatever is 
returned from there will override “defaultTopic”.
You can place your switch there.

Cheers,
Gordon

On 6 July 2017 at 11:43:55 PM, Richard Xin (richardxin...@yahoo.com) wrote:

Thanks,
I'm not sure I understand this, what I need is to have single a process 
subscribing multiple kafka topics, and have a switch clause for different 
topics in my SinkFunction, did you I need to change the way how the kafka 
producer to produce the message? 
Any pointer to code samples will be appreciated. 
Thanks Again
Richard


On Wednesday, July 5, 2017, 10:25:59 PM PDT, Tzu-Li (Gordon) Tai 
<tzuli...@apache.org> wrote:


Hi Richard,

Producing to multiple topics is treated a bit differently in the Flink Kafka 
producer.
You need to set a single default target topic, and in 
`KeyedSerializationSchema#getTargetTopic()` you can override the default topic 
with whatever is returned. The `getTargetTopic` method is invoked for each 
record.

Cheers,
Gordon


On 6 July 2017 at 9:09:29 AM, Richard Xin (richardxin...@yahoo.com) wrote:

when using FlinkKafkaConsumer010 to subscribing multiple topics as 
List topics = Arrays.asList("test1","test2");

DataStream stream = env.addSource(new FlinkKafkaConsumer010<>(topics, 
new SimpleStringSchema(), properties));

How do I get topic names in my SinkFunction? i.e. stream.addSink()

Thanks,
Richard

Re: Kafka Producer - Null Pointer Exception when processing by element

2017-07-16 Thread Tzu-Li (Gordon) Tai
Hi,

void output(DataStream<List> inputStream) {
These seems odd. Are your events intended to be a list? If not, this should be 
a `DataStream`.

From the code snippet you’ve attached in the first post, it seems like you’ve 
initialized your source incorrectly.

`env.fromElements(List<...>)` will take the whole list as a single event, thus 
your source is only emitting a single list as a record. Perhaps what you 
actually want to do here is `env.fromCollection(List<...>)`?

This should also eliminate the situation that “only after the whole List is 
processed can the records then be sent to their respective sinks”, as you 
mentioned in your reply.

but this doesn't seem very ideal to us because it requires a new operator to 
first split the stream
IMO, this wouldn’t really introduce noticeable overhead, as the operator will 
be chained to the map operator. Side outputs is also the preferred way here, as 
side outputs subsume stream splitting.



Overall, I think it is reasonable to do a map -> split -> Kafka / Cassandra 
sinks in your case, given that you’ve declared the source correctly to be a 
single SuperclassEvent as a record.

The operator overhead is fairly trivial if it is chained. Another reason to use 
sinks properly is that only then will you benefit from the exactly-once / 
at-least-once delivery guarantees to external systems (which requires 
collaboration between the sink and Flink’s checkpointing).

Hope this helps!

Cheers,
Gordon


On 17 July 2017 at 2:59:38 AM, earellano [via Apache Flink User Mailing List 
archive.] (ml+s2336050n14294...@n4.nabble.com) wrote:

Tzu-Li (Gordon) Tai wrote
It seems like you’ve misunderstood how to use the FlinkKafkaProducer, or is 
there any specific reason why you want to emit elements to Kafka in a map 
function?

The correct way to use it is to add it as a sink function to your pipeline, i.e.

DataStream someStream = …
someStream.addSink(new FlinkKafkaProducer010<>(“topic”, schema, props));
// or, FlinkKafkaProducer010.writeToKafkaWithTimestamps(someStream, “topic”, 
schema, props);
The reason we want to use processElement() & a map function, instead of 
someStream.addSink() is that our output logic has conditional depending on the 
type of record we have.

Our overall program follows this path:

  Serialized JSON consumed from Kafka: DataStream
  parsed, producing a List of successful events and/or error events: 
DataStream<List>
  outputted conditionally, going to Kafka or Cassandra depending on which type 
of event it is.


This is our code for output logic (although modified types to not use our IP):

void output(DataStream<List> inputStream) {
    inputStream.map( eventList ->
      for (SuperclassEvent  event : eventList) {
         if (event instanceof SuccessEvent)
            emitToCassandra(event);
         else if (event instanceof ErrorEvent)
            emitToKafka(event);
       }
       return true;  // we don't actually want to return anything, just don't 
know how else to use map
);

 
That is, we have sinks for both Kafka and Cassandra, and want to be able to 
iterate through our List and conditionally send each 
individual record to its appropriate sink depending on its type.

I know Flink offers SplitStreams for a similar purpose, but this doesn't seem 
very ideal to us because it requires a new operator to first split the stream, 
and only after the whole List is processed can the records then be sent to 
their respective sinks. Whereas the code above sends the records to their sinks 
immediately upon finding its type.  

--

Is there any way to make processElement() work so that we can work on 
individual records instead of the whole DataStream? Or are we misusing Flink? 
How do you recommend doing this the best way possible?


--

Also, if processElement() and invoke() aren't meant to be used, should they be 
made package private? Happy to make a pull request if so, although fear that 
might break a few things.

If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Producer-Null-Pointer-Exception-when-processing-by-element-tp14288p14294.html
This email was sent by earellano (via Nabble)
To receive all replies by email, subscribe to this discussion


Re: Kafka Producer - Null Pointer Exception when processing by element

2017-07-14 Thread Tzu-Li (Gordon) Tai
Hi,

It seems like you’ve misunderstood how to use the FlinkKafkaProducer, or is 
there any specific reason why you want to emit elements to Kafka in a map 
function?

The correct way to use it is to add it as a sink function to your pipeline, i.e.

DataStream someStream = …
someStream.addSink(new FlinkKafkaProducer010<>(“topic”, schema, props));
// or, FlinkKafkaProducer010.writeToKafkaWithTimestamps(someStream, “topic”, 
schema, props);

The processElement is invoked internally by the system, and isn’t intended to 
be invoked by user code.

See [1] for more details.

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-producer

On 15 July 2017 at 3:35:32 AM, earellano (eric.arell...@ge.com) wrote:

I'm getting a NullPointerException when calling  
KakfaProducer010.processElement(StreamRecord). Specifically, this comes  
from its helper function invokeInternally(), and the function's  
internalProducer not being configured properly, resulting in passing a null  
value to one its helper functions.  

We'd really appreciate taking a look at below to see if this is a Flink bug  
or something we're doing wrong.  

Our code  

This is a simplified version of our program:  

  

  

You can copy this code here to reproduce locally:  
https://pastebin.com/Li8iZuFj   

Stack trace  

Here is the stack trace:  

  

What causes error in Flink code  

The method processElement() calls invokeInternally(). Within  
invokeInternally(), Flink tries to parse variable values, e.g. topic name  
and partitions.  

The app fails when trying to resolve the partitions. Specifically, the  
method to resolve the partitions has a parameter of KafkaProducer, which is  
passed as null, resulting in the NullPointerException. See the highlighted  
lines below of running the program in debugger view.  

  

So, I think the issue is that the internalProducer is not being setup  
correctly. Namely, it never sets the value for its producer field, so this  
stays null and then gets passed around, resulting in the Null Pointer  
Exception.  

Bug? Or issue with our code?  

My question to you all is if this is a bug that needs to be fixed, or if it  
results from us improperly configuring our program? The above code shows our  
configuration within the program itself (just setting bootstrap.servers),  
and we created the Kafka topic on our local machine as follows:  

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor  
1 --partitions 1 --topic process-elements-tests  



Any help greatly appreciated! We're really hoping to get processElements()  
to work, because our streaming architecture requires working on individual  
elements rather than the entire data stream (sink behavior depends on the  
individual values within each record of our DataStream>).  





--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Producer-Null-Pointer-Exception-when-processing-by-element-tp14288.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: High back-pressure after recovering from a save point

2017-07-14 Thread Tzu-Li (Gordon) Tai
Can you try starting from the savepoint, but telling Kafka to start from the 
latest offset?

(@gordon: Is that possible in Flink 1.3.1 or only in 1.4-SNAPSHOT ?)
This is already possible in Flink 1.3.x. 
`FlinkKafkaConsumer#setStartFromLatest()` would be it.


On 15 July 2017 at 12:33:53 AM, Stephan Ewen (se...@apache.org) wrote:

Can you try starting from the savepoint, but telling Kafka to start from the 
latest offset?

(@gordon: Is that possible in Flink 1.3.1 or only in 1.4-SNAPSHOT ?)

On Fri, Jul 14, 2017 at 11:18 AM, Kien Truong  wrote:
Hi,

Sorry for the version typo, I'm running 1.3.1. I did not test with 1.2.x.

The jobs runs fine with almost 0 back-pressure if it's started from scratch or 
if I reuse the kafka consumers group id without specifying the safe point.

Best regards,
Kien
On Jul 14, 2017, at 15:59, Stephan Ewen  wrote:
Hi!

Flink 1.3.2 does not yet exist. Do you mean 1.3.1 or latest master?

Can you tell us whether this occurs only in 1.3.x and worked well in 1.2.x?
If you just keep the job running without savepoint/restore, you do not get into 
backpressure situations?

Thanks,
Stephan


On Fri, Jul 14, 2017 at 1:15 AM, Kien Truong  wrote:
Hi Fabian,
This happens to me even when the restore is immediate, so there's not much data 
in Kafka to catch up (5 minutes max)

Regards
Kien
On Jul 13, 2017, at 23:40, Fabian Hueske < fhue...@gmail.com> wrote:
I would guess that this is quite usual because the job has to "catch-up" work.
For example, if you took a save point two days ago and restore the job today, 
the input data of the last two days has been written to Kafka (assuming Kafka 
as source) and needs to be processed.
The job will now read as fast as possible from Kafka to catch-up to the 
presence. This means the data is much fast ingested (as fast as Kafka can read 
and ship it) than during regular processing (as fast as your sources produce).
The processing speed is bound by your Flink job which means there will be 
backpressure.

Once the job caught-up, the backpressure should disappear.

Best, Fabian

2017-07-13 15:48 GMT+02:00 Kien Truong :
Hi all,

I have one job where back-pressure  is significantly higher after resuming from 
a save point.

Because that job makes heavy use of stateful functions with RocksDBStateBackend 
,

I'm suspecting that this is the cause of performance degradation.

Does anyone encounter simillar issues or have any tips for debugging ?


I'm using Flink 1.3.2 with YARN in detached mode.


Regards,

Kien






Re: Fink: KafkaProducer Data Loss

2017-07-13 Thread Tzu-Li (Gordon) Tai
Hi Ninad & Piotr,

AFAIK, when this issue was reported, Ninad was using 09.
FLINK-6996 only affects Flink Kafka Producer 010, so I don’t think that’s the 
cause here.

@Ninad
Code to reproduce this would definitely be helpful here, thanks. If you prefer 
to provide that privately, that would also be fine.

Cheers,
Gordon

On 13 July 2017 at 4:13:07 PM, Piotr Nowojski (pi...@data-artisans.com) wrote:

Hi,

I’m not sure how relevant is this, but recently I have found and fixed a bug, 
that in certain conditions was causing data losses for all of the 
FlinkKafkaProducers in Flink:

https://issues.apache.org/jira/browse/FLINK-6996

Namely on checkpoint “flush” method was not being called. It should be fixed in 
Flink 1.3.2 and 1.4 releases.

Piotrek

On Jul 12, 2017, at 7:32 PM, ninad  wrote:

Hey guys, any update on this? If needed I can attach our code.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p14224.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.



Re: Kafka Connectors

2017-07-05 Thread Tzu-Li (Gordon) Tai
Since you’re placing jars in the lib/ folder yourself instead of packaging an 
uber jar, you also need to provide the Kafka dependency jars.

It usually isn’t recommended to place dependencies in the lib/ folder. 
Packaging an uber jar is the recommended approach.

Using the maven-shade-plugin, you can build an uber jar. For example, add the 
following to your project Maven POM:



org.apache.maven.plugins
maven-shade-plugin


package

shade




uber-${artifactId}-${version}





Best,
Gordon
On 6 July 2017 at 1:02:40 AM, Paolo Cristofanelli 
(cristofanelli.pa...@gmail.com) wrote:

Hi Gordon,

thanks for your answer. I haven't seen that documentation, I have tried to 
download the jar file and to put it in the flink lib folder. 
I have downloaded the following jar file : flink-connector-kafka-0.8_2.10 at 
[1] . But it seems it is not enough because now I receive 
Java.lang.ClassNotFoundException:
kafka.producer.Partitioner . I do not understand, in my Maven I just included 
what I specified in the previous email, why flink would need others jar? And 
how I can track them?

Cheers,
Paolo

[1]  
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.8_2.10/1.0.0


On 5 July 2017 at 18:20, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
Hi Paolo,

Have you followed the instructions in this documentation [1]?

The connectors are not part of the binary distributions, so you would need to 
bundle the dependencies with your code by building an uber jar.

Cheers,
Gordon

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/linking.html


On 6 July 2017 at 12:04:47 AM, Paolo Cristofanelli 
(cristofanelli.pa...@gmail.com) wrote:

Hi, 
I am following the basic steps to implement a consumer and a producer with 
Kafka for Flink. My Flink version is 1.2.0, the Kafka's one is 0.10.2.0, so in 
my pom.xml I will add the :


 org.apache.flink
 flink-connector-kafka-0.10_2.10
 1.2.0


The problem is that if I run the program with maven or in my IDE it works. When 
I upload the jar on flink I get : java.lang.ClassNotFoundException:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010

I googled a bit and I found out that usually these problems are caused by a 
version problem but I cannot understand where the error is. 

Best,
Paolo 



Re: how to get topic names in SinkFunction when using FlinkKafkaConsumer010 with multiple topics

2017-07-05 Thread Tzu-Li (Gordon) Tai
Hi Richard,

Producing to multiple topics is treated a bit differently in the Flink Kafka 
producer.
You need to set a single default target topic, and in 
`KeyedSerializationSchema#getTargetTopic()` you can override the default topic 
with whatever is returned. The `getTargetTopic` method is invoked for each 
record.

Cheers,
Gordon


On 6 July 2017 at 9:09:29 AM, Richard Xin (richardxin...@yahoo.com) wrote:

when using FlinkKafkaConsumer010 to subscribing multiple topics as 
List topics = Arrays.asList("test1","test2");

DataStream stream = env.addSource(new FlinkKafkaConsumer010<>(topics, 
new SimpleStringSchema(), properties));

How do I get topic names in my SinkFunction? i.e. stream.addSink()

Thanks,
Richard

Re: Kafka Connectors

2017-07-05 Thread Tzu-Li (Gordon) Tai
Hi Paolo,

Have you followed the instructions in this documentation [1]?

The connectors are not part of the binary distributions, so you would need to 
bundle the dependencies with your code by building an uber jar.

Cheers,
Gordon

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/linking.html


On 6 July 2017 at 12:04:47 AM, Paolo Cristofanelli 
(cristofanelli.pa...@gmail.com) wrote:

Hi, 
I am following the basic steps to implement a consumer and a producer with 
Kafka for Flink. My Flink version is 1.2.0, the Kafka's one is 0.10.2.0, so in 
my pom.xml I will add the :


 org.apache.flink
 flink-connector-kafka-0.10_2.10
 1.2.0


The problem is that if I run the program with maven or in my IDE it works. When 
I upload the jar on flink I get : java.lang.ClassNotFoundException:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010

I googled a bit and I found out that usually these problems are caused by a 
version problem but I cannot understand where the error is. 

Best,
Paolo 

Re:Re: Error when set RocksDBStateBackend option in Flink?

2017-06-29 Thread Tzu-Li (Gordon) Tai
Hi!

Thanks a lot for reporting this.
It turns out that this is a nasty bug: 
https://issues.apache.org/jira/browse/FLINK-7041.

Aljoscha is working on fixing it already. It’s definitely a critical bug, so 
we’ll try to include in the next bugfix release.

Cheers,
Gordon
On 29 June 2017 at 7:05:09 PM, 周思华 (summerle...@163.com) wrote:


I will keep the call to special my rocksdb option later, OptionFactory have 
already extended the java.io.Serializable interface and MRocksDBFactory  
implement from OptionFactory , so MRocksDBFactory should have the 
Serializability. Why this problem occur? 

At 2017-06-29 17:53:07, "Ted Yu"  wrote:
Since MRocksDBFactory doesn't add any option, it seems 
rocksDBBackEnd.setOptions() call can be skipped.

If you choose to keep the call, please take a look at (OptionsFactory extends 
java.io.Serializable):

https://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html

On Thu, Jun 29, 2017 at 2:16 AM, 周思华  wrote:
I use the follow code to set RocksDBStateBackend and it option, it can run 
correctly locally, but can't be submitted to cluster.

Main.class:
public static void main() {
       final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        RocksDBStateBackend rocksDBBackEnd = new 
RocksDBStateBackend("file:///Users/zsh/tmp/rocksdb");
        rocksDBBackEnd.setPredefinedOptions(PredefinedOptions.DEFAULT);
        rocksDBBackEnd.setOptions(new MRocksDBFactory());
        env.setStateBackend(rocksDBBackEnd);
        ...        
        env.execute(jobName);
}

MRocksDBFactory.class:
public class MRocksDBFactory implements OptionsFactory {
@Override
public DBOptions createDBOptions(DBOptions currentOptions) {

return currentOptions;
}

@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
currentOptions) {

return currentOptions;

}
}

The exception info in jobmanager.log look like below:

2017-06-29 16:29:27,162 WARN  akka.remote.ReliableDeliverySupervisor            
            - Association with remote system 
[akka.tcp://flink@10.242.98.255:52638] has failed, address is now gated for 
[5000] ms. Reason: [gerryzhou.MRocksDBFactory]
2017-06-29 16:29:27,163 ERROR Remoting                                          
            - gerryzhou.MRocksDBFactory
java.lang.ClassNotFoundException: gerryzhou.MRocksDBFactory
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677)
at 
akka.util.ClassLoaderObjectInputStream.resolveClass(ClassLoaderObjectInputStream.scala:19)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)
at 

Re: Fwd: Incremental aggregation using Fold and failure recovery

2017-06-29 Thread Tzu-Li (Gordon) Tai
Sorry, one typo.
public AverageAccumulator merge(WindowStats a, WindowStats b) {
should be:
public WindowStats merge(WindowStats a, WindowStats b) {
On 29 June 2017 at 8:22:34 PM, Tzu-Li (Gordon) Tai (tzuli...@apache.org) wrote:

I see. Then yes, a fold operation would be more efficient here.


However, can you give an idea on how to use aggregateFunction in latest flink 
to replace the following fold function?
Sure! The documentation for 1.3 is still a bit lagging behind for some of the 
new features, but the Javadoc for `AggregateFunction` should be rather 
self-explaining.

As a quick sketch, here’s what you would do to achieve the same thing:

public class WindowStatsAggregator implements AggregateFunction<IN, 
WindowStats, OUT> {
  
public WindowStats createAccumulator() {
return new WindowStats();
}

public AverageAccumulator merge(WindowStats a, WindowStats b) {
// merge the two unique products map in your WindowStats
}

public void add(IN value, WindowStats acc) {
// update your unique products map
}

public OUT getResult(WindowStats acc) {
return acc.getMap();
}
}

As you can see, the `AggregateFunction` is more generic, and should subsume 
whatever you were previously doing with fold.
Your previous `WindowStats` class is basically the state accumulator, and you 
need to implement how to update it, merge two accumulators, and retrieve the 
final accumulated result.

For more info, I would point to the class Javadocs of `AggregateFunction`.

Best,
Gordon
On 29 June 2017 at 8:06:25 PM, Ahmad Hassan (ahmad.has...@gmail.com) wrote:

Hi Gordon,

Thanks for the details. I am using fold to process events and maintain 
statistics per each product ID within WindowStats instance. So fold is much 
efficient because events can be in millions but unique products will be less 
than 50k. However, if i use generic window function, It will be less efficient 
because window function will receive a collection of millions of events and 
they will be replicated for each sliding window as Flink replicate events for 
each sliding window.

However, can you give an idea on how to use aggregateFunction in latest flink 
to replace the following fold function?

final DataStream eventStream = inputStream
.keyBy(TENANT, CATEGORY)
.window(SlidingProcessingTimeWindows.of(Time.hour(1,Time.minute(5)))
.fold(new WindowStats(), newProductAggregationMapper(), 
newProductAggregationWindowFunction());

Thanks!

On 29 June 2017 at 12:57, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
Hi Ahmad,

Yes, that is correct. The aggregated fold value (i.e. your WindowStats 
instance) will be checkpointed by Flink as managed state, and restored from the 
last complete checkpoint in case of failures.
One comment on using the fold function: if what you’re essentially doing in the 
fold is just collecting the elements of the windows per key and performing the 
actual aggregation in the window function, then you don't need the fold.
A generic window function should suit that case. See [1].

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#windowfunction---the-generic-case


On 29 June 2017 at 5:11:58 PM, Ahmad Hassan (ahmad.has...@gmail.com) wrote:

Any thoughts on this problem please?


Hi All,

I am collecting millions of events per 24hour for 'N' number of products where 
'N' can be 50k. I use the following fold mechanism with sliding window:

final DataStream eventStream = inputStream
.keyBy(TENANT, CATEGORY)
.window(SlidingProcessingTimeWindows.of(Time.hour(24,Time.minute(5)))
.fold(new WindowStats(), newProductAggregationMapper(), 
newProductAggregationWindowFunction());

In WindowStats class, I keep a map of HashMap> which keeps products event count and other various metrics. So 
for 50k products I will have 50k entries in the map within WindowStats instance 
instead of millions of Events as fold function will process them as the event 
arrives.

My question is, if I set (env.enableCheckpointing(1000)), then the WindowStats 
instance for each existing window will automatically be checkpointed and 
restored on recovery? If not then how can I better a implement above usecase to 
store the state of WindowStats object within fold operation please?

Thanks for all the help.

Best Regards,




Re: Fwd: Incremental aggregation using Fold and failure recovery

2017-06-29 Thread Tzu-Li (Gordon) Tai
I see. Then yes, a fold operation would be more efficient here.


However, can you give an idea on how to use aggregateFunction in latest flink 
to replace the following fold function?
Sure! The documentation for 1.3 is still a bit lagging behind for some of the 
new features, but the Javadoc for `AggregateFunction` should be rather 
self-explaining.

As a quick sketch, here’s what you would do to achieve the same thing:

public class WindowStatsAggregator implements AggregateFunction<IN, 
WindowStats, OUT> {
  
public WindowStats createAccumulator() {
return new WindowStats();
}

public AverageAccumulator merge(WindowStats a, WindowStats b) {
// merge the two unique products map in your WindowStats
}

public void add(IN value, WindowStats acc) {
// update your unique products map
}

public OUT getResult(WindowStats acc) {
return acc.getMap();
}
}

As you can see, the `AggregateFunction` is more generic, and should subsume 
whatever you were previously doing with fold.
Your previous `WindowStats` class is basically the state accumulator, and you 
need to implement how to update it, merge two accumulators, and retrieve the 
final accumulated result.

For more info, I would point to the class Javadocs of `AggregateFunction`.

Best,
Gordon
On 29 June 2017 at 8:06:25 PM, Ahmad Hassan (ahmad.has...@gmail.com) wrote:

Hi Gordon,

Thanks for the details. I am using fold to process events and maintain 
statistics per each product ID within WindowStats instance. So fold is much 
efficient because events can be in millions but unique products will be less 
than 50k. However, if i use generic window function, It will be less efficient 
because window function will receive a collection of millions of events and 
they will be replicated for each sliding window as Flink replicate events for 
each sliding window.

However, can you give an idea on how to use aggregateFunction in latest flink 
to replace the following fold function?

final DataStream eventStream = inputStream
.keyBy(TENANT, CATEGORY)
.window(SlidingProcessingTimeWindows.of(Time.hour(1,Time.minute(5)))
.fold(new WindowStats(), newProductAggregationMapper(), 
newProductAggregationWindowFunction());

Thanks!

On 29 June 2017 at 12:57, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
Hi Ahmad,

Yes, that is correct. The aggregated fold value (i.e. your WindowStats 
instance) will be checkpointed by Flink as managed state, and restored from the 
last complete checkpoint in case of failures.
One comment on using the fold function: if what you’re essentially doing in the 
fold is just collecting the elements of the windows per key and performing the 
actual aggregation in the window function, then you don't need the fold.
A generic window function should suit that case. See [1].

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#windowfunction---the-generic-case


On 29 June 2017 at 5:11:58 PM, Ahmad Hassan (ahmad.has...@gmail.com) wrote:

Any thoughts on this problem please?


Hi All,

I am collecting millions of events per 24hour for 'N' number of products where 
'N' can be 50k. I use the following fold mechanism with sliding window:

final DataStream eventStream = inputStream
.keyBy(TENANT, CATEGORY)
.window(SlidingProcessingTimeWindows.of(Time.hour(24,Time.minute(5)))
.fold(new WindowStats(), newProductAggregationMapper(), 
newProductAggregationWindowFunction());

In WindowStats class, I keep a map of HashMap> which keeps products event count and other various metrics. So 
for 50k products I will have 50k entries in the map within WindowStats instance 
instead of millions of Events as fold function will process them as the event 
arrives.

My question is, if I set (env.enableCheckpointing(1000)), then the WindowStats 
instance for each existing window will automatically be checkpointed and 
restored on recovery? If not then how can I better a implement above usecase to 
store the state of WindowStats object within fold operation please?

Thanks for all the help.

Best Regards,




Re: Fwd: Incremental aggregation using Fold and failure recovery

2017-06-29 Thread Tzu-Li (Gordon) Tai
Hi Ahmad,

Yes, that is correct. The aggregated fold value (i.e. your WindowStats 
instance) will be checkpointed by Flink as managed state, and restored from the 
last complete checkpoint in case of failures.
One comment on using the fold function: if what you’re essentially doing in the 
fold is just collecting the elements of the windows per key and performing the 
actual aggregation in the window function, then you don't need the fold.
A generic window function should suit that case. See [1].

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#windowfunction---the-generic-case


On 29 June 2017 at 5:11:58 PM, Ahmad Hassan (ahmad.has...@gmail.com) wrote:

Any thoughts on this problem please?


Hi All,

I am collecting millions of events per 24hour for 'N' number of products where 
'N' can be 50k. I use the following fold mechanism with sliding window:

final DataStream eventStream = inputStream
.keyBy(TENANT, CATEGORY)
.window(SlidingProcessingTimeWindows.of(Time.hour(24,Time.minute(5)))
.fold(new WindowStats(), newProductAggregationMapper(), 
newProductAggregationWindowFunction());

In WindowStats class, I keep a map of HashMap> which keeps products event count and other various metrics. So 
for 50k products I will have 50k entries in the map within WindowStats instance 
instead of millions of Events as fold function will process them as the event 
arrives.

My question is, if I set (env.enableCheckpointing(1000)), then the WindowStats 
instance for each existing window will automatically be checkpointed and 
restored on recovery? If not then how can I better a implement above usecase to 
store the state of WindowStats object within fold operation please?

Thanks for all the help.

Best Regards,



Re: Flink 1.2.0 - Flink Job restart config not using Flink Cluster restart config.

2017-06-29 Thread Tzu-Li (Gordon) Tai
Hi Vera,

Apparently, if there no job-specific restart strategy, an infinite 
FixedDelayRestartStrategy is always used for the job submission:
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L571-L576

IMO, this seems to be a bug, as the global restart strategy config should be 
respected. I’ll get back to this once I confirm this.

Regards,
Gordon

On 28 June 2017 at 10:22:37 PM, Vera Coberley (veracober...@gmail.com) wrote:

Hi all,

We are running Flink 1.2.0. Our flink-conf.yaml is configured to use a default 
restart-strategy of fixed-delay, with 3 attempts:

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3

These settings are echoed by the GlobalConfiguration (see first set of log 
statements). However, the job is submitted with a maxNumberRestartAttempts of 
Max INT instead of 3 (see second set of log statements)

The job is enabled for checkpointing, and it does not have any job-specific 
restart strategy defined:
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(params.getLong("checkpoint.interval", 3000L)); 

I assumed the default restart configuration would carry over to the job. Am I 
mistaken in my assumption, do I have a configuration error, or is this a bug?

-- Vera

2017-06-27 19:52:11.288 [main] INFO  
org.apache.flink.configuration.GlobalConfiguration  - Loading configuration 
property: restart-strategy, fixed-delay
2017-06-27 19:52:11.288 [main] INFO  
org.apache.flink.configuration.GlobalConfiguration  - Loading configuration 
property: restart-strategy.fixed-delay.attempts, 3
2017-06-27 19:52:17.642 [flink-akka.actor.default-dispatcher-16] INFO  
org.apache.flink.yarn.YarnJobManager  - Submitting job XYZ
2017-06-27 19:52:17.652 [flink-akka.actor.default-dispatcher-16] INFO  
org.apache.flink.yarn.YarnJobManager  - Using restart strategy 
FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, 
delayBetweenRestartAttempts=1) for XYZ

Re: New message processing time after recovery.

2017-06-28 Thread Tzu-Li (Gordon) Tai
Hi!

That is correct. With processing time, all time-based operations will use the 
current machine system time (which would take into account).
Note that with processing time, the elements don’t carry a meaningful timestamp.

Best,
Gordon


On 28 June 2017 at 11:22:43 AM, yunfan123 (yunfanfight...@foxmail.com) wrote:

For example, my job failed in timestamp 1.  
Recovery from checkpoint takes 600 seconds.  
So the new elements' processing time into my streams is 601?  



--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/New-message-processing-time-after-recovery-tp14032.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: Partition index from partitionCustom vs getIndexOfThisSubtask downstream

2017-06-28 Thread Tzu-Li (Gordon) Tai
Hi Urs,

Yes, the returned “index” from the custom partitioner refers to the parallel 
subtask index.
I agree that the mismatching terminology used could be slightly misleading. 
Could you open a JIRA to improve the Javadoc for that? Thanks!

Cheers,
Gordon


On 27 June 2017 at 10:40:47 PM, Urs Schoenenberger 
(urs.schoenenber...@tngtech.com) wrote:

Hi,  

if I use DataStream::partitionCustom, will the partition number that my  
custom Partitioner returns always be equal to getIndexOfThisSubtask  
in the following operator?  

A test case with different parallelisms seems to suggest this is true,  
but the Javadoc seems ambiguous to me since the Partitioner doc talks  
about the "partition index" while the RuntimeContext doc talks about the  
"index of the parallel subtask".  

Thanks,  
Urs  

--  
Urs Schönenberger - urs.schoenenber...@tngtech.com  

TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring  
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke  
Sitz: Unterföhring * Amtsgericht München * HRB 135082  


Re: Question about Checkpoint

2017-06-26 Thread Tzu-Li (Gordon) Tai
Hi Desheng,

Welcome to the community!

What you’re asking alludes the question: How does Flink support end-to-end 
(from external source to external sink, e.g. Kafka to database) exactly-once 
delivery?
Whether or not that is supported depends on the guarantees of the source and 
sink and how they work with Flink’s checkpointing; see the overview of the 
guarantees here [1].

I think you already understand how sources work with checkpoints for 
exactly-once.
For sinks to be exactly-once, the external system needs to be able to 
participate in the checkpointing mechanism, so it depends on what the sink is.
The participation is usually in some form of transaction, which is only 
committed once a job's checkpoint is fully completed.
If the sink doesn’t support this, you can also achieve effective end-to-end 
exactly-once by making your database writes / updates idempotent, so that 
replaying the stream since t1 does not affect the end results.

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/guarantees.html


On 27 June 2017 at 11:36:00 AM, ZalaCheung (gzzhangdesh...@corp.netease.com) 
wrote:

Hi Flink Community,

I am new to Flink and now looking at checkpoint of Flink.

After reading the document, I am still confused. Here is scene:

I have a datastream finally flow to a database sink. I will update one of the 
field in database based on the incomming stream. I have now complete a 
snapshot, say t1, and snapshot t2 is on progress. After snapshot t1 complete, I 
update my database several times.  Then BEFORE snapshot t2 complete, my task 
fail. 

Based on my understanding of checkpoint on Flink, I will recover from snapshot 
t1 and re-execute the Streams between t1 and t2. But I've already update my 
database several times after t1. Does flink deal with this issue?


Best,
Desheng Zhang
E-mail: gzzhangdesh...@corp.netease.com;



Re: Kinesis connector SHARD_GETRECORDS_MAX default value

2017-06-22 Thread Tzu-Li (Gordon) Tai
Hi Steffen,

Thanks for bringing up the discussion!

I think the reason why SHARD_GETRECORDS_INTERVAL_MILLIS was defaulted to 0 in 
the first place was because we didn’t want false impressions that the there was 
some latency introduced in Flink with the default settings.
To this end, I’m leaning towards not touching SHARD_GETRECORDS_INTERVAL_MILLIS.
Ideally, the docs in this section [1] should guide the user to tweak this 
setting if they’re having issues with competing apps also consuming the shards. 
Could also improve this if you think the notice for this issue needs to be more 
prominent.

However, I do suggest changing SHARD_GETRECORDS_MAX to a higher value. 100 
seems to be too small by default.
Increasing that should also be safe in the sense that it would not introduce 
unexpected behavior changes for the user.
What do you think?

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kinesis.html#internally-used-kinesis-apis

On 23 June 2017 at 4:30:10 AM, Steffen Hausmann (stef...@hausmann-family.de) 
wrote:

Hi Gordon,  

Regarding the value for SHARD_GETRECORDS_INTERVAL_MILLIS.  

The best practice would be to set this value to 1000, as this settings  
allows other applications to read from the same Kinesis stream. However,  
this may considerably increase the latency of the respective Flink  
application as events are only consumed every second.  

Alternatively, the default value be as low as 200 (you can currently  
only read from a shard 5 times a second, hence values lower than 200 are  
undesirable), which reduces the latency of a single Flink application,  
but causes undesirable effects when multiple applications consume events  
from the same Kinesis stream.  

I'd prefer setting the default to 1000, but I wanted to get your opinion  
on this before I submit the PR.  

Cheers,  
Steffen  

On 24/04/2017 00:39, Tzu-Li (Gordon) Tai wrote:  
> Thanks for filing the JIRA!  
>  
> Would you also be up to open a PR to for the change? That would be very  
> very helpful :)  
>  
> Cheers,  
> Gordon  
>  
> On 24 April 2017 at 3:27:48 AM, Steffen Hausmann  
> (stef...@hausmann-family.de <mailto:stef...@hausmann-family.de>) wrote:  
>  
>> Hi Gordon,  
>>  
>> thanks for looking into this and sorry it took me so long to file the  
>> issue: https://issues.apache.org/jira/browse/FLINK-6365.  
>>  
>> Really appreciate your contributions for the Kinesis connector!  
>>  
>> Cheers,  
>> Steffen  
>>  
>> On 22/03/2017 20:21, Tzu-Li (Gordon) Tai wrote:  
>> > Hi Steffan,  
>> >  
>> > I have to admit that I didn’t put too much thoughts in the default  
>> > values for the Kinesis consumer.  
>> >  
>> > I’d say it would be reasonable to change the default values to follow  
>> > KCL’s settings. Could you file a JIRA for this?  
>> >  
>> > In general, we might want to reconsider all the default values for  
>> > configs related to the getRecords call, i.e.  
>> > - SHARD_GETRECORDS_MAX  
>> > - SHARD_GETRECORDS_INTERVAL_MILLIS  
>> > - SHARD_GETRECORDS_BACKOFF_*  
>> >  
>> > Cheers,  
>> > Gordon  
>> >  
>> > On March 23, 2017 at 2:12:32 AM, Steffen Hausmann  
>> > (stef...@hausmann-family.de <mailto:stef...@hausmann-family.de>) wrote:  
>> >  
>> >> Hi there,  
>> >>  
>> >> I recently ran into problems with a Flink job running on an EMR cluster  
>> >> consuming events from a Kinesis stream receiving roughly 15k  
>> >> event/second. Although the EMR cluster was substantially scaled and CPU  
>> >> utilization and system load were well below any alarming threshold, the  
>> >> processing of events of the stream increasingly fell behind.  
>> >>  
>> >> Eventually, it turned out that the SHARD_GETRECORDS_MAX defaults to 100  
>> >> which is apparently causing too much overhead when consuming events from  
>> >> the stream. Increasing the value to 5000, a single GetRecords call to  
>> >> Kinesis can retrieve up to 10k records, made the problem go away.  
>> >>  
>> >> I wonder why the default value for SHARD_GETRECORDS_MAX is chosen so low  
>> >> (100x less than it could be). The Kinesis Client Library defaults to  
>> >> 5000 and it's recommended to use this default value:  
>> >> http://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#consumer-app-reading-slower.
>> >>   
>> >>  
>> >>  
>> >> Thanks for the clarification!  
>> >>  
>> >> Cheers,  
>> >> Steffen  


Re: Painful KryoException: java.lang.IndexOutOfBoundsException on Flink Batch Api scala

2017-06-22 Thread Tzu-Li (Gordon) Tai
Thanks a lot Andrea!


On 21 June 2017 at 8:36:32 PM, Andrea Spina (andrea.sp...@radicalbit.io) wrote:

I Gordon, sadly no news since the last message. 

At the end I jumped over the issue, I was not able to solve it. I'll try 
provide a runnable example asap. 

Thank you. 

Andrea 



-- 
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Painful-KryoException-java-lang-IndexOutOfBoundsException-on-Flink-Batch-Api-scala-tp13558p13896.html
 
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com. 


RE: Kafka and Flink integration

2017-06-20 Thread Tzu-Li (Gordon) Tai
Yes, POJOs can contain other nested POJO types. You just have to make sure that 
the nested field is either public, or has a corresponding public getter- and 
setter- method that follows the Java beans naming conventions.


On 21 June 2017 at 12:20:31 AM, nragon (nuno.goncal...@wedotechnologies.com) 
wrote:

Can i have pojo has composition of other pojo? 
My custom object has many dependencies and in order to refactor it I must 
also change another 5 classes as well. 



-- 
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13874.html
 
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com. 


RE: Kafka and Flink integration

2017-06-20 Thread Tzu-Li (Gordon) Tai
Hi Nuno,

In general, if it is possible, it is recommended that you map your generic
classes to Tuples / POJOs [1].
For Tuples / POJOs, Flink will create specialized serializers for them,
whereas for generic classes (i.e. types which cannot be treated as POJOs)
Flink simply fallbacks to using Kryo for them.
The actual performance gain may depend a bit on what the original generic
class type looked like.

One other thing probably to look at is enabling object reuse for
de-/serialization. However, be aware that the user code needs to be aware
of this, otherwise it may lead to unexpected errors.

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/types_serialization.html#rules-for-pojo-types

On 20 June 2017 at 11:24:03 PM, Nuno Rafael Goncalves (
nuno.goncal...@wedotechnologies.com) wrote:

I believe there are some performance impact while de/serializing, which is
“normal”. What I’m trying to understand is if there are any tips to improve
this process. For instance, tuples vs general class types. Do you know if
it’s worth it to map a custom object into tuple just for de/serialization
process?

According to jfr analysis, kryo methods are hit a lot.







-Original Message-
From: Nico Kruber [mailto:n...@data-artisans.com]
Sent: 20 de junho de 2017 16:04
To: user@flink.apache.org
Cc: Nuno Rafael Goncalves 
Subject: Re: Kafka and Flink integration



No, this is only necessary if you want to register a custom serializer
itself [1]. Also, in case you are wondering about registerKryoType() - this
is only needed as a performance optimisation.



What exactly is your problem? What are you trying to solve?

(I can't read JFR files here, and from what I read at Oracle's site, this
requires a commercial license, too...)





Nico



[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/

custom_serializers.html



On Tuesday, 20 June 2017 11:54:45 CEST nragon wrote:

> Do I need to use registerTypeWithKryoSerializer() in my execution

> environment?

> My serialization into kafka is done with the following snippet

>

> try (ByteArrayOutputStream byteArrayOutStream = new
ByteArrayOutputStream();

> Output output = new Output(byteArrayOutStream)) {

>   Kryo kryo = new Kryo();

>   kryo.writeClassAndObject(output, event);

>   output.flush();

>   return byteArrayOutStream.toByteArray();

> } catch (IOException e) {

>   return null;

> }

>

> "event" is my custom object.

>

> then i desirialize it in flink's kafka consumer

> try (ByteArrayInputStream byteArrayInStream = new

> ByteArrayInputStream(bytes); Input input = new Input(byteArrayInStream,

> bytes.length)) {

>   Kryo kryo = new Kryo();

>   return kryo.readClassAndObject(input);

> } catch (IOException e) {

>   return null;

> }

>

> Thanks

>

>

>

> --

> View this message in context:

>
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-a

> nd-Flink-integration-tp13792p13841.html Sent from the Apache Flink User

> Mailing List archive. mailing list archive at Nabble.com.


image003.jpg@01D2E9E1.26D2D370
Description: Binary data


Re: Guava version conflict

2017-06-19 Thread Tzu-Li (Gordon) Tai
Thanks a lot! Please keep me updated with this :)


On 19 June 2017 at 6:33:15 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:

Ok, I'll let you know as soon as I recompile Flink 1.3.x.

Thanks,
Flavio

On Mon, Jun 19, 2017 at 7:26 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> 
wrote:
Hi Flavio,

It’s most likely related to a problem with Maven.
I’m pretty sure this actually isn’t a problem anymore. Could you verify by 
rebuilding Flink and see if the problem remains? Thanks a lot.

Best,
Gordon


On 16 June 2017 at 6:25:10 PM, Tzu-Li (Gordon) Tai (tzuli...@apache.org) wrote:

That’s actually what I’m trying to figure out right now, what had changed since 
then ...


On 16 June 2017 at 12:23:57 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:

I've never tested with flink 1.3.0, I have the problem with Flink 1.2.1. Didn't 
you say that you also had non-shaded guava dependencies in the flink dist jar 
some days ago?

On Fri, Jun 16, 2017 at 12:19 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> 
wrote:
Hi Flavio,

I was just doing some end-to-end rebuild Flink + cluster execution with ES sink 
tests, and it seems like the Guava shading problem isn’t there anymore in the 
flink-dist jar.

On the `release-1.3` branch, built with Maven 3.0.5, the Guava dependencies in 
flink-dist are all properly shaded.
Tested with ES 2.3.5 (the default Elasticsearch 2 connector version) and ES 
2.4.1 (overwritten the ES 2 version and rebuilt the ES connector), all worked 
fine without the Guava conflict issue.

So I’m pretty sure that if the problem still exists for you, the conflict would 
have came from other dependencies in your code.

Cheers,
Gordon


On 15 June 2017 at 8:24:48 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:

Hi Gordon,
any news on this?

On Mon, Jun 12, 2017 at 9:54 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> 
wrote:
This seems like a shading problem then.
I’ve tested this again with Maven 3.0.5, even without building against CDH 
Hadoop binaries the flink-dist jar contains non-shaded Guava dependencies.

Let me investigate a bit and get back to this!

Cheers,
Gordon


On 8 June 2017 at 2:47:02 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:

On an empty machine (with Ubuntu 14.04.5 LTS) and an empty maven local repo I 
did:
git clone https://github.com/apache/flink.git && cd flink && git checkout 
tags/release-1.2.1
/opt/devel/apache-maven-3.3.9/bin/mvn clean install 
-Dhadoop.version=2.6.0-cdh5.9.0 -Dhbase.version=1.2.0-cdh5.9.0 
-Dhadoop.core.version=2.6.0-mr1-cdh5.9.0 -DskipTests -Pvendor-repos
cd flink-dist
/opt/devel/apache-maven-3.3.9/bin/mvn clean install 
-Dhadoop.version=2.6.0-cdh5.9.0 -Dhbase.version=1.2.0-cdh5.9.0 
-Dhadoop.core.version=2.6.0-mr1-cdh5.9.0 -DskipTests -Pvendor-repos
jar tf target/flink-1.2.1-bin/flink-1.2.1/lib/flink-dist_2.10-1.2.1.jar  | grep 
MoreExecutors
And I still see guava dependencies:

org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors$1.class
org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors$SameThreadExecutorService.class
org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors$ListeningDecorator.class
org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator.class
org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$1.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$2.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$3.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$4.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$Application$1.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$Application.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$DirectExecutor.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$DirectExecutorService.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$ListeningDecorator.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator$ListenableScheduledTask.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator$NeverSuccessfulListenableFutureTask.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors.class
com/google/common/util/concurrent/MoreExecutors$1.class
com/google/common/util/concurrent/MoreExecutors$SameThreadExecutorService.class
com/google/common/util/concurrent/MoreExecutors$ListeningDecorator.class
com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator.class
com/google/common/util/concur

Re: How can I get last successful checkpoint id in sink?

2017-06-19 Thread Tzu-Li (Gordon) Tai
Hi!

The last completed checkpoint ID should be obtainable using the monitoring REST 
API [1], under the url “/jobs/{jobID}/checkpoints/“.

It is also visible in the JobManager Web UI under the “checkpoints” tab of each 
job. The web UI fetches its information using the monitoring REST API, so 
anything available there should also be retrievable via the REST API.

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html


On 18 June 2017 at 2:24:31 PM, yunfan123 (yunfanfight...@foxmail.com) wrote:

It seems flink not support this?  



--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-can-I-get-last-successful-checkpoint-id-in-sink-tp13815.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: Guava version conflict

2017-06-18 Thread Tzu-Li (Gordon) Tai
Hi Flavio,

It’s most likely related to a problem with Maven.
I’m pretty sure this actually isn’t a problem anymore. Could you verify by 
rebuilding Flink and see if the problem remains? Thanks a lot.

Best,
Gordon

On 16 June 2017 at 6:25:10 PM, Tzu-Li (Gordon) Tai (tzuli...@apache.org) wrote:

That’s actually what I’m trying to figure out right now, what had changed since 
then ...


On 16 June 2017 at 12:23:57 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:

I've never tested with flink 1.3.0, I have the problem with Flink 1.2.1. Didn't 
you say that you also had non-shaded guava dependencies in the flink dist jar 
some days ago?

On Fri, Jun 16, 2017 at 12:19 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> 
wrote:
Hi Flavio,

I was just doing some end-to-end rebuild Flink + cluster execution with ES sink 
tests, and it seems like the Guava shading problem isn’t there anymore in the 
flink-dist jar.

On the `release-1.3` branch, built with Maven 3.0.5, the Guava dependencies in 
flink-dist are all properly shaded.
Tested with ES 2.3.5 (the default Elasticsearch 2 connector version) and ES 
2.4.1 (overwritten the ES 2 version and rebuilt the ES connector), all worked 
fine without the Guava conflict issue.

So I’m pretty sure that if the problem still exists for you, the conflict would 
have came from other dependencies in your code.

Cheers,
Gordon


On 15 June 2017 at 8:24:48 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:

Hi Gordon,
any news on this?

On Mon, Jun 12, 2017 at 9:54 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> 
wrote:
This seems like a shading problem then.
I’ve tested this again with Maven 3.0.5, even without building against CDH 
Hadoop binaries the flink-dist jar contains non-shaded Guava dependencies.

Let me investigate a bit and get back to this!

Cheers,
Gordon


On 8 June 2017 at 2:47:02 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:

On an empty machine (with Ubuntu 14.04.5 LTS) and an empty maven local repo I 
did:
git clone https://github.com/apache/flink.git && cd flink && git checkout 
tags/release-1.2.1
/opt/devel/apache-maven-3.3.9/bin/mvn clean install 
-Dhadoop.version=2.6.0-cdh5.9.0 -Dhbase.version=1.2.0-cdh5.9.0 
-Dhadoop.core.version=2.6.0-mr1-cdh5.9.0 -DskipTests -Pvendor-repos
cd flink-dist
/opt/devel/apache-maven-3.3.9/bin/mvn clean install 
-Dhadoop.version=2.6.0-cdh5.9.0 -Dhbase.version=1.2.0-cdh5.9.0 
-Dhadoop.core.version=2.6.0-mr1-cdh5.9.0 -DskipTests -Pvendor-repos
jar tf target/flink-1.2.1-bin/flink-1.2.1/lib/flink-dist_2.10-1.2.1.jar  | grep 
MoreExecutors
And I still see guava dependencies:

org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors$1.class
org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors$SameThreadExecutorService.class
org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors$ListeningDecorator.class
org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator.class
org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$1.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$2.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$3.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$4.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$Application$1.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$Application.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$DirectExecutor.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$DirectExecutorService.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$ListeningDecorator.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator$ListenableScheduledTask.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator$NeverSuccessfulListenableFutureTask.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors.class
com/google/common/util/concurrent/MoreExecutors$1.class
com/google/common/util/concurrent/MoreExecutors$SameThreadExecutorService.class
com/google/common/util/concurrent/MoreExecutors$ListeningDecorator.class
com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator.class
com/google/common/util/concurrent/MoreExecutors.class

It seems that it mix togheter guava 11 (probably coming from CDH dependencies) 
and guava 18 classes.

Also using maven 3.0.5 lead to the same output :(

Best,
Flavio 

On Wed, Jun 7, 2017 at 6:21 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
Yes, those shoul

Re: Kafka and Flink integration

2017-06-16 Thread Tzu-Li (Gordon) Tai
Hi!

It’s usually always recommended to register your classes with Kryo, to avoid 
the somewhat inefficient classname writing.
Also, depending on the case, to decrease serialization overhead, nothing really 
beats specific custom serialization. So, you can also register specific 
serializers for Kryo to use for the type.
If you need to store these custom objects as managed state for your operators, 
you can also have your own custom Flink TypeSerializer for that.

Best,
Gordon

On 16 June 2017 at 12:27:06 PM, nragon (nuno.goncal...@wedotechnologies.com) 
wrote:

I have to produce custom objects into kafka and read them with flink. Any  
tuning advices to use kryo? Such as class registration or something like  
that? Any examples?  

Thanks  



--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: Guava version conflict

2017-06-16 Thread Tzu-Li (Gordon) Tai
That’s actually what I’m trying to figure out right now, what had changed since 
then ...


On 16 June 2017 at 12:23:57 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:

I've never tested with flink 1.3.0, I have the problem with Flink 1.2.1. Didn't 
you say that you also had non-shaded guava dependencies in the flink dist jar 
some days ago?

On Fri, Jun 16, 2017 at 12:19 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> 
wrote:
Hi Flavio,

I was just doing some end-to-end rebuild Flink + cluster execution with ES sink 
tests, and it seems like the Guava shading problem isn’t there anymore in the 
flink-dist jar.

On the `release-1.3` branch, built with Maven 3.0.5, the Guava dependencies in 
flink-dist are all properly shaded.
Tested with ES 2.3.5 (the default Elasticsearch 2 connector version) and ES 
2.4.1 (overwritten the ES 2 version and rebuilt the ES connector), all worked 
fine without the Guava conflict issue.

So I’m pretty sure that if the problem still exists for you, the conflict would 
have came from other dependencies in your code.

Cheers,
Gordon


On 15 June 2017 at 8:24:48 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:

Hi Gordon,
any news on this?

On Mon, Jun 12, 2017 at 9:54 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> 
wrote:
This seems like a shading problem then.
I’ve tested this again with Maven 3.0.5, even without building against CDH 
Hadoop binaries the flink-dist jar contains non-shaded Guava dependencies.

Let me investigate a bit and get back to this!

Cheers,
Gordon


On 8 June 2017 at 2:47:02 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:

On an empty machine (with Ubuntu 14.04.5 LTS) and an empty maven local repo I 
did:
git clone https://github.com/apache/flink.git && cd flink && git checkout 
tags/release-1.2.1
/opt/devel/apache-maven-3.3.9/bin/mvn clean install 
-Dhadoop.version=2.6.0-cdh5.9.0 -Dhbase.version=1.2.0-cdh5.9.0 
-Dhadoop.core.version=2.6.0-mr1-cdh5.9.0 -DskipTests -Pvendor-repos
cd flink-dist
/opt/devel/apache-maven-3.3.9/bin/mvn clean install 
-Dhadoop.version=2.6.0-cdh5.9.0 -Dhbase.version=1.2.0-cdh5.9.0 
-Dhadoop.core.version=2.6.0-mr1-cdh5.9.0 -DskipTests -Pvendor-repos
jar tf target/flink-1.2.1-bin/flink-1.2.1/lib/flink-dist_2.10-1.2.1.jar  | grep 
MoreExecutors
And I still see guava dependencies:

org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors$1.class
org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors$SameThreadExecutorService.class
org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors$ListeningDecorator.class
org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator.class
org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$1.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$2.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$3.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$4.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$Application$1.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$Application.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$DirectExecutor.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$DirectExecutorService.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$ListeningDecorator.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator$ListenableScheduledTask.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator$NeverSuccessfulListenableFutureTask.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors.class
com/google/common/util/concurrent/MoreExecutors$1.class
com/google/common/util/concurrent/MoreExecutors$SameThreadExecutorService.class
com/google/common/util/concurrent/MoreExecutors$ListeningDecorator.class
com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator.class
com/google/common/util/concurrent/MoreExecutors.class

It seems that it mix togheter guava 11 (probably coming from CDH dependencies) 
and guava 18 classes.

Also using maven 3.0.5 lead to the same output :(

Best,
Flavio 

On Wed, Jun 7, 2017 at 6:21 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
Yes, those should not be in the flink-dist jar, so the root reason should be 
that the shading isn’t working properly for your custom build.

If possible, could you try building Flink again with a lower Maven version as 
specified in the doc, and see if that works?
If so, it could be that Maven 3.3.x 

Re: Guava version conflict

2017-06-16 Thread Tzu-Li (Gordon) Tai
Hi Flavio,

I was just doing some end-to-end rebuild Flink + cluster execution with ES sink 
tests, and it seems like the Guava shading problem isn’t there anymore in the 
flink-dist jar.

On the `release-1.3` branch, built with Maven 3.0.5, the Guava dependencies in 
flink-dist are all properly shaded.
Tested with ES 2.3.5 (the default Elasticsearch 2 connector version) and ES 
2.4.1 (overwritten the ES 2 version and rebuilt the ES connector), all worked 
fine without the Guava conflict issue.

So I’m pretty sure that if the problem still exists for you, the conflict would 
have came from other dependencies in your code.

Cheers,
Gordon


On 15 June 2017 at 8:24:48 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:

Hi Gordon,
any news on this?

On Mon, Jun 12, 2017 at 9:54 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> 
wrote:
This seems like a shading problem then.
I’ve tested this again with Maven 3.0.5, even without building against CDH 
Hadoop binaries the flink-dist jar contains non-shaded Guava dependencies.

Let me investigate a bit and get back to this!

Cheers,
Gordon


On 8 June 2017 at 2:47:02 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:

On an empty machine (with Ubuntu 14.04.5 LTS) and an empty maven local repo I 
did:
git clone https://github.com/apache/flink.git && cd flink && git checkout 
tags/release-1.2.1
/opt/devel/apache-maven-3.3.9/bin/mvn clean install 
-Dhadoop.version=2.6.0-cdh5.9.0 -Dhbase.version=1.2.0-cdh5.9.0 
-Dhadoop.core.version=2.6.0-mr1-cdh5.9.0 -DskipTests -Pvendor-repos
cd flink-dist
/opt/devel/apache-maven-3.3.9/bin/mvn clean install 
-Dhadoop.version=2.6.0-cdh5.9.0 -Dhbase.version=1.2.0-cdh5.9.0 
-Dhadoop.core.version=2.6.0-mr1-cdh5.9.0 -DskipTests -Pvendor-repos
jar tf target/flink-1.2.1-bin/flink-1.2.1/lib/flink-dist_2.10-1.2.1.jar  | grep 
MoreExecutors
And I still see guava dependencies:

org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors$1.class
org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors$SameThreadExecutorService.class
org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors$ListeningDecorator.class
org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator.class
org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$1.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$2.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$3.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$4.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$Application$1.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$Application.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$DirectExecutor.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$DirectExecutorService.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$ListeningDecorator.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator$ListenableScheduledTask.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator$NeverSuccessfulListenableFutureTask.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors.class
com/google/common/util/concurrent/MoreExecutors$1.class
com/google/common/util/concurrent/MoreExecutors$SameThreadExecutorService.class
com/google/common/util/concurrent/MoreExecutors$ListeningDecorator.class
com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator.class
com/google/common/util/concurrent/MoreExecutors.class

It seems that it mix togheter guava 11 (probably coming from CDH dependencies) 
and guava 18 classes.

Also using maven 3.0.5 lead to the same output :(

Best,
Flavio 

On Wed, Jun 7, 2017 at 6:21 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
Yes, those should not be in the flink-dist jar, so the root reason should be 
that the shading isn’t working properly for your custom build.

If possible, could you try building Flink again with a lower Maven version as 
specified in the doc, and see if that works?
If so, it could be that Maven 3.3.x simply isn’t shading properly even with the 
double compilation trick.


On 7 June 2017 at 6:17:15 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:

What I did was to take the sources of the new ES connector and I took them into 
my code.
Flink was compiled with maven 3.3+ but I did the double compilation as 
specified in the Flink build section.
In flink dist I see guava classes, e.g.:

com/google/common/util/concurrent/MoreExecutors$1.

Re: Flink cluster : Client is not connected to any Elasticsearch nodes!

2017-06-16 Thread Tzu-Li (Gordon) Tai
Hi Dhinesh,

One other thing that came to mind: the Elasticsearch 2 connector, by default, 
uses ES 2.3.5.
If you’re using an Elasticsearch 2 with major version higher than that, you 
need to build the connector with the matching version.
When running lower major version clients against a higher major version ES 
installation, this exception is very common.

Best,
Gordon


On 6 June 2017 at 6:39:52 PM, Tzu-Li (Gordon) Tai (tzuli...@apache.org) wrote:

Hi Dhinesh,

Could it be that you didn’t configure the network binding address of the ES 
installation properly?
You need to make sure it isn’t binded to 127.0.0.1, which I think in some 
Elasticsearch versions is the default binding.

Also, just a reminder if you haven’t done so, please make sure that the ES 
dependencies is properly bundled for cluster execution. See [1] for details on 
this.

Cheers,
Gordon

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/linking.html

On 6 June 2017 at 12:01:54 AM, dhinesh raja (dhinesh.r...@bizruntime.com) wrote:

Dear Team,

I am running flink streaming job with Elasticsearch connector2. I am able to 
run in my eclipse but when I run in flink local cluster I got this error. could 
you please help me in this?I have attached my code. I am using flink 1.2.0 and 
elastic search 2.x


 java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
at 
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:375)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
at java.lang.Thread.run(Thread.java:748)

--
Thanks & Regards

Dhinesh Raja. M




Re: Painful KryoException: java.lang.IndexOutOfBoundsException on Flink Batch Api scala

2017-06-16 Thread Tzu-Li (Gordon) Tai
Hi Andrea,

I’ve rallied back to this and wanted to check on the status. Have you managed 
to solve this in the end, or is this still a problem for you?

If it’s still a problem, would you be able to provide a complete runnable 
example job that can reproduce the problem (ideally via a git branch I can 
clone and run :))?
This would help me with digging a bit more into the issue. Thanks a lot!

Best,
Gordon


On 8 June 2017 at 6:58:46 PM, Andrea Spina (andrea.sp...@radicalbit.io) wrote:

Hi guys,  

thank you for your interest. Yes @Flavio, I tried both 1.2.0 and 1.3.0  
versions.  
Following Gordon suggestion I tried to put setReference to false but sadly  
it didn't help. What I did then was to declare a custom serializer as the  
following:  

class BlockSerializer extends Serializer[Block] with Serializable {  

override def read(kryo: Kryo, input: Input, block: Class[Block]): Block  
= {  
val serializer = new SparseMatrixSerializer  

val blockData = kryo.readObject(input, classOf[SparseMatrix],  
serializer)  
new Block(blockData)  
}  

override def write(kryo: Kryo, output: Output, block: Block): Unit = {  
val serializer = new SparseMatrixSerializer  

kryo.register(classOf[SparseMatrix], serializer)  
kryo.writeObject(output, block.blockData, serializer)  

output.close()  
}  

}  

class SparseMatrixSerializer extends Serializer[SparseMatrix] with  
Serializable {  

override def read(kryo: Kryo, input: Input, sparse:  
Class[SparseMatrix]): SparseMatrix = {  
val collectionIntSerializer = new CollectionSerializer()  
collectionIntSerializer.setElementClass(classOf[Int], new  
IntSerializer)  
val collectionDoubleSerializer = new CollectionSerializer()  
collectionDoubleSerializer.setElementClass(classOf[Double], new  
DoubleSerializer)  

val numRows = input.readInt  
val numCols = input.readInt  
val colPtrs = kryo.readObject(input,  
classOf[java.util.ArrayList[Int]], collectionIntSerializer).asScala.toArray  
val rowIndices = kryo.readObject(input,  
classOf[java.util.ArrayList[Int]], collectionIntSerializer).asScala.toArray  
val data = kryo.readObject(input,  
classOf[java.util.ArrayList[Double]],  
collectionDoubleSerializer).asScala.toArray  

input.close()  

new SparseMatrix(numRows = numRows, numCols = numCols, colPtrs =  
colPtrs, rowIndices = rowIndices, data = data)  
}  

override def write(kryo: Kryo, output: Output, sparseMatrix:  
SparseMatrix): Unit = {  

val collectionIntSerializer = new CollectionSerializer()  
collectionIntSerializer.setElementClass(classOf[Int], new  
IntSerializer)  

val collectionDoubleSerializer = new CollectionSerializer()  
collectionDoubleSerializer.setElementClass(classOf[Double], new  
DoubleSerializer)  

kryo.register(classOf[java.util.ArrayList[Int]],  
collectionIntSerializer)  
kryo.register(classOf[java.util.ArrayList[Double]],  
collectionDoubleSerializer)  

output.writeInt(sparseMatrix.numRows)  
output.writeInt(sparseMatrix.numCols)  
kryo.writeObject(output, sparseMatrix.colPtrs.toList.asJava,  
collectionIntSerializer)  
kryo.writeObject(output, sparseMatrix.rowIndices.toList.asJava,  
collectionIntSerializer)  
kryo.writeObject(output, sparseMatrix.data.toList.asJava,  
collectionDoubleSerializer)  

output.close()  
}  

}  

What I obtained is the same previous exception but on different accessed  
index and size.  

Caused by: java.lang.Exception: The data preparation for task 'CHAIN  
GroupReduce (GroupReduce at  
my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103))
  
-> Map (Map at  
my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:189))'
  
, caused an error: Error obtaining the sorted input: Thread 'SortMerger  
Reading Thread' terminated due to an exception: Index: 1, Size: 0  
at  
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)  
at  
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)  
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)  
at java.lang.Thread.run(Thread.java:745)  
Caused by: java.lang.RuntimeException: Error obtaining the sorted input:  
Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 1,  
Size: 0  
at  
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
  
at  
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095)  
at  
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
  
at  
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)  
... 3 more  
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'  
terminated due to an exception: Index: 1, Size: 0  
at  
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
  
Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 0  
at java.util.ArrayList.rangeCheck(ArrayList.java:653)  
at 

Re: dynamic add sink to flink

2017-06-12 Thread Tzu-Li (Gordon) Tai
Hi,

The Flink Kafka Producer allows writing to multiple topics beside the default 
topic.
To do this, you can override the configured default topic by implementing the 
`getTargetTopic` method on the `KeyedSerializationSchema`.
That method is invoked for each record, and if a value is returned, the record 
will be routed to that topic instead of the default one.

Does this address what you have in mind?

Cheers,
Gordon

On 10 June 2017 at 6:20:59 AM, qq (468137...@qq.com) wrote:

Hi: 
we use flink as a router to our kafka, we read from one kafka and to a lot of 
diffrent kafka and topic, 
but it will cost too much time to start the flink job if we want to add some 
other kafka sink to the flink, so 
if there any way to dynamic add sink to flink or just start the flink job 
faster? The reason of slow start flink 
job I think is the lots of kafka sink. 

We now use the demo code like this, 

 
splitStream = stream.split(by the content) 
for ((k, v) <- map) { 
splitStream.select(k).addSink(new kafkaSink(v)) 
} 
 








Re: Guava version conflict

2017-06-12 Thread Tzu-Li (Gordon) Tai
This seems like a shading problem then.
I’ve tested this again with Maven 3.0.5, even without building against CDH 
Hadoop binaries the flink-dist jar contains non-shaded Guava dependencies.

Let me investigate a bit and get back to this!

Cheers,
Gordon

On 8 June 2017 at 2:47:02 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:

On an empty machine (with Ubuntu 14.04.5 LTS) and an empty maven local repo I 
did:
git clone https://github.com/apache/flink.git && cd flink && git checkout 
tags/release-1.2.1
/opt/devel/apache-maven-3.3.9/bin/mvn clean install 
-Dhadoop.version=2.6.0-cdh5.9.0 -Dhbase.version=1.2.0-cdh5.9.0 
-Dhadoop.core.version=2.6.0-mr1-cdh5.9.0 -DskipTests -Pvendor-repos
cd flink-dist
/opt/devel/apache-maven-3.3.9/bin/mvn clean install 
-Dhadoop.version=2.6.0-cdh5.9.0 -Dhbase.version=1.2.0-cdh5.9.0 
-Dhadoop.core.version=2.6.0-mr1-cdh5.9.0 -DskipTests -Pvendor-repos
jar tf target/flink-1.2.1-bin/flink-1.2.1/lib/flink-dist_2.10-1.2.1.jar  | grep 
MoreExecutors
And I still see guava dependencies:

org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors$1.class
org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors$SameThreadExecutorService.class
org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors$ListeningDecorator.class
org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator.class
org/apache/flink/hadoop/shaded/com/google/common/util/concurrent/MoreExecutors.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$1.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$2.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$3.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$4.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$Application$1.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$Application.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$DirectExecutor.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$DirectExecutorService.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$ListeningDecorator.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator$ListenableScheduledTask.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator$NeverSuccessfulListenableFutureTask.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator.class
org/apache/flink/shaded/com/google/common/util/concurrent/MoreExecutors.class
com/google/common/util/concurrent/MoreExecutors$1.class
com/google/common/util/concurrent/MoreExecutors$SameThreadExecutorService.class
com/google/common/util/concurrent/MoreExecutors$ListeningDecorator.class
com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator.class
com/google/common/util/concurrent/MoreExecutors.class

It seems that it mix togheter guava 11 (probably coming from CDH dependencies) 
and guava 18 classes.

Also using maven 3.0.5 lead to the same output :(

Best,
Flavio 

On Wed, Jun 7, 2017 at 6:21 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
Yes, those should not be in the flink-dist jar, so the root reason should be 
that the shading isn’t working properly for your custom build.

If possible, could you try building Flink again with a lower Maven version as 
specified in the doc, and see if that works?
If so, it could be that Maven 3.3.x simply isn’t shading properly even with the 
double compilation trick.


On 7 June 2017 at 6:17:15 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:

What I did was to take the sources of the new ES connector and I took them into 
my code.
Flink was compiled with maven 3.3+ but I did the double compilation as 
specified in the Flink build section.
In flink dist I see guava classes, e.g.:

com/google/common/util/concurrent/MoreExecutors$1.class
com/google/common/util/concurrent/MoreExecutors$SameThreadExecutorService.class
com/google/common/util/concurrent/MoreExecutors$ListeningDecorator.class
com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator.class
com/google/common/util/concurrent/MoreExecutors.class

Is it a problem of the shading with Maven 3.3+?

Best,
Flavio

On Wed, Jun 7, 2017 at 5:48 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
Ah, I assumed you were running 1.3.0 (since you mentioned “new” ES connector).

Another thing to check, if you built Flink yourself, make sure you’re not using 
Maven 3.3+. There are shading problems when Flink is built with Maven versions 
higher then that.
The flink-dist jar should not contain any non-shaded Guava dependencies, could 
you also quickly check that?

On 7 June 2017 at 5:42:28 PM, 

Re: Fink: KafkaProducer Data Loss

2017-06-11 Thread Tzu-Li (Gordon) Tai
Hi Ninad,

Thanks for the logs!
Just to let you know, I’ll continue to investigate this early next week.

Cheers,
Gordon

On 8 June 2017 at 7:08:23 PM, ninad (nni...@gmail.com) wrote:

I built Flink v1.3.0 with cloudera hadoop and am able to see the data loss.  

Here are the details:  

*tmOneCloudera583.log*  

Received session window task:  
*2017-06-08 15:10:46,131 INFO org.apache.flink.runtime.taskmanager.Task  
- TriggerWindow(ProcessingTimeSessionWindows(3),  
ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@f4361ec},
  
ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:1100)) ->  
Sink: sink.http.sep (1/1) (3ef2f947096f3b0986b8ad334c7d5d3c) switched from  
CREATED to DEPLOYING.  

Finished checkpoint 2 (Synchronous part)  
2017-06-08 15:15:51,982 DEBUG  
org.apache.flink.streaming.runtime.tasks.StreamTask -  
TriggerWindow(ProcessingTimeSessionWindows(3),  
ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@f4361ec},
  
ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:1100)) ->  
Sink: sink.http.sep (1/1) - finished synchronous part of checkpoint  
2.Alignment duration: 0 ms, snapshot duration 215 ms  
*  

The task failed before the verification of completed checkpoint could be  
verified.  
i.e, I don't see the log saying "Notification of complete checkpoint for  
task TriggerWindow" for checkpoint 2.  

*jmCloudera583.log*  

Job Manager received acks for checkpoint 2  

*2017-06-08 15:15:51,898 DEBUG  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received  
acknowledge message for checkpoint 2 from task  
3e980e4d3278ef0d0f5e0fa9591cc53d of job 3f5aef5e15a23bce627c05c94760fb16.  
2017-06-08 15:15:51,982 DEBUG  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received  
acknowledge message for checkpoint 2 from task  
3ef2f947096f3b0986b8ad334c7d5d3c of job 3f5aef5e15a23bce627c05c94760fb16*.  

Job Manager tried to restore from checkpoint 2.  

*2017-06-08 15:16:02,111 INFO  
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore -  
Found 1 checkpoints in ZooKeeper.  
2017-06-08 15:16:02,111 INFO  
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore -  
Trying to retrieve checkpoint 2.  
2017-06-08 15:16:02,122 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring  
from latest valid checkpoint: Checkpoint 2 @ 149693476  
6105 for 3f5aef5e15a23bce627c05c94760fb16.*  

*tmTwocloudera583.log*  

Task Manager tried to restore the data and was successful.  

*2017-06-08 15:16:02,258 DEBUG  
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Restoring  
snapshot from state handles:  
[KeyGroupsStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0,
  
endKeyGroup=127}, offsets=[13460, 13476, 13492, 13508, 13524, 13540, 13556,  
13572, 13588, 13604, 13620, 13636, 13652, 13668, 13684, 14566, 14582, 14598,  
14614, 14630, 14646, 14662, 14678, 14694, 14710, 14726, 14742, 14758, 14774,  
14790, 14806, 14822, 14838, 14854, 14870, 14886, 14902, 14918, 14934, 14950,  
14966, 14982, 14998, 15014, 15030, 15046, 15062, 15078, 15094, 15110, 15126,  
15142, 15158, 15174, 15190, 15206, 16088, 16104, 16120, 16136, 28818, 28834,  
28850, 28866, 28882, 28898, 28914, 28930, 28946, 28962, 28978, 28994, 29010,  
29026, 29042, 29058, 29074, 29090, 29106, 29122, 29138, 29154, 29170, 40346,  
40362, 40378, 40394, 40410, 40426, 40442, 40458, 40474, 40490, 40506, 40522,  
40538, 41420, 41436, 41452, 41468, 41484, 41500, 41516, 41532, 41548, 41564,  
41580, 41596, 41612, 41628, 41644, 41660, 41676, 41692, 41708, 41724, 41740,  
41756, 41772, 41788, 41804, 41820, 41836, 41852, 41868, 41884, 41900,  
41916]}, data=File State:  
hdfs://dc1udtlhcld005.stack.qadev.corp:8022/user/harmony_user/flink-checkpoint/3f5aef5e15a23bce627c05c94760fb16/chk-2/aa076014-120b-4955-ab46-89094358ebeb
  
[41932 bytes]}].*  

But apparently, the retore state didn't have all the messages the window had  
received. Because  
a few messages were not replayed, and the kafka sink didn't receive all the  
messages.  

Attaching the files here.  

jmCloudera583.log  

  
tmOneCloudera583.log  

  
tmTwoCloudera583.log  

  

BTW, I posted my test results and logs for regular Flink v1.3.0 on Jun 6,  
but don't see that post here. I did receive an email thought. Hope you guys  
saw that.  

Thanks for your patience guys.  



--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13597.html
  
Sent from the Apache Flink User 

Re: In-transit Data Encryption in EMR

2017-06-11 Thread Tzu-Li (Gordon) Tai
Hi Vinay,

Apologies for the inactivity on this thread, I was occupied with some critical 
fixes for 1.3.1.

1. Can anyone please explain me how do you test if SSL is working correctly ? 
Currently I am just relying on the logs.

AFAIK, if any of the SSL configuration settings are enabled (*.ssl.enabled) and 
your job is running fine, then everything should be functioning.

2. Wild Card is not working with the keytool command, can you please let me 
know what is the issue with the following command:

The wildcard option only works for wildcarding subdomains.
For example, SAN=*.domain.com

On 9 June 2017 at 4:33:46 PM, vinay patil (vinay18.pa...@gmail.com) wrote:

Hi Guys,

Can anyone please provide me solution to my queries.

On Jun 8, 2017 11:30 PM, "Vinay Patil" <[hidden email]> wrote:
Hi Guys,

I am able to setup SSL correctly, however the following command  does not work 
correctly and results in the error I had mailed earlier
flink run -m yarn-cluster -yt deploy-keys/ TestJob.jar

Few Doubts: 
1. Can anyone please explain me how do you test if SSL is working correctly ? 
Currently I am just relying on the logs.

2. Wild Card is not working with the keytool command, can you please let me 
know what is the issue with the following command:

keytool -genkeypair -alias ca -keystore: -ext SAN=dns:node1.* 


Regards,
Vinay Patil

On Mon, Jun 5, 2017 at 8:43 PM, vinay patil [via Apache Flink User Mailing List 
archive.] <[hidden email]> wrote:
Hi Gordon,

The yarn session gets created when I try to run the following command:
yarn-session.sh -n 4 -s 2 -jm 1024 -tm 3000 -d --ship deploy-keys/

However when I try to access the Job Manager UI, it gives me exception as :
javax.net.ssl.SSLHandshakeException: sun.security.validator.ValidatorException: 
PKIX path building failed: 
sun.security.provider.certpath.SunCertPathBuilderException: unable to find 
valid certification path to requested target

I am able to see the Job Manager UI  when I imported the CA certificate to java 
truststore on EMR master node :
keytool -keystore /etc/alternatives/jre/lib/security/cacerts -importcert -alias 
FLINKSSL -file ca.cer


Does this mean that SSL is configured correctly ? I can see in the Job Manager 
configurations and also in th e logs. Is there any other way to verify ?

Also the keystore and truststore  password should be masked in the logs which 
is not case.

2017-06-05 14:51:31,135 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: security.ssl.enabled, true
2017-06-05 14:51:31,136 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: security.ssl.keystore, deploy-keys/ca.keystore
2017-06-05 14:51:31,136 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: security.ssl.keystore-password, password
2017-06-05 14:51:31,136 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: security.ssl.key-password, password
2017-06-05 14:51:31,136 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: security.ssl.truststore, deploy-keys/ca.truststore
2017-06-05 14:51:31,136 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: security.ssl.truststore-password, password


Regards,
Vinay Patil


If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/In-transit-Data-Encryption-in-EMR-tp13455p13490.html
To start a new topic under Apache Flink User Mailing List archive., email 
[hidden email]
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML


View this message in context: Re: In-transit Data Encryption in EMR
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Guava version conflict

2017-06-07 Thread Tzu-Li (Gordon) Tai
Yes, those should not be in the flink-dist jar, so the root reason should be 
that the shading isn’t working properly for your custom build.

If possible, could you try building Flink again with a lower Maven version as 
specified in the doc, and see if that works?
If so, it could be that Maven 3.3.x simply isn’t shading properly even with the 
double compilation trick.


On 7 June 2017 at 6:17:15 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:

What I did was to take the sources of the new ES connector and I took them into 
my code.
Flink was compiled with maven 3.3+ but I did the double compilation as 
specified in the Flink build section.
In flink dist I see guava classes, e.g.:

com/google/common/util/concurrent/MoreExecutors$1.class
com/google/common/util/concurrent/MoreExecutors$SameThreadExecutorService.class
com/google/common/util/concurrent/MoreExecutors$ListeningDecorator.class
com/google/common/util/concurrent/MoreExecutors$ScheduledListeningDecorator.class
com/google/common/util/concurrent/MoreExecutors.class

Is it a problem of the shading with Maven 3.3+?

Best,
Flavio

On Wed, Jun 7, 2017 at 5:48 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
Ah, I assumed you were running 1.3.0 (since you mentioned “new” ES connector).

Another thing to check, if you built Flink yourself, make sure you’re not using 
Maven 3.3+. There are shading problems when Flink is built with Maven versions 
higher then that.
The flink-dist jar should not contain any non-shaded Guava dependencies, could 
you also quickly check that?

On 7 June 2017 at 5:42:28 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:

I shaded the Elasticsearch dependency [1] and now the job works.
So I cannot run a job that needs guava 18 on Flink 1.2.1...

[1]  https://www.elastic.co/blog/to-shade-or-not-to-shade

On Wed, Jun 7, 2017 at 5:33 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
Hi Flavio,

Could there be another dependency in your job that requires a conflicting 
version (w.r.t. ES 2.4.1) of Guava?
I’ve just double checked the flink-dist jar, there doesn’t seem to be any 
non-shaded Guava dependencies there, so the conflict should not have been 
caused by Flink.

Cheers,
Gordon


On 7 June 2017 at 4:12:04 PM, Flavio Pompermaier (pomperma...@okkam.it) wrote:

Hi to all,
I'm trying to use the new ES connector to index data from Flink (with ES 2.4.1).
When I try to run it from Eclipse everything is ok, when I run it from the 
cluster I get the following exception:

java.lang.NoSuchMethodError: 
com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
    at org.elasticsearch.threadpool.ThreadPool.(ThreadPool.java:192)
    at 
org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)

In my fat jar there are the classes of guava 18 (ES requires that version), 
Flink runs on CDH 5.9 (that use guava 11), in flink-dist jar I think that 
there's guava 11 classes while in flink-hadoop-compatibility there are shade 
guava 18 dependencies.

How can I make the job successfully run on the cluster?

Best,
Flavio




<    1   2   3   4   5   6   >