Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes

2017-11-03 Thread Vergilio, Thalita
Just an update: by changing the JOB_MANAGER_RPC_ADDRESS to the public IP of the 
JobManager and exposing port 6123 as {{PUBLIC_IP}}:6123:6123, I manged to get 
the TaskManagers from different nodes and even different subnets to talk to the 
JobManager.


This is how I created the services:


docker network create -d overlay overlay

docker service create --name jobmanager --env 
JOB_MANAGER_RPC_ADDRESS={{PUBLIC_IP}}  -p 8081:8081 -p{{PUBLIC_IP}}:6123:6123 
-p 48081:48081 -p 6124:6124 -p 6125:6125 --network overlay --constraint 
'node.hostname == ubuntu-swarm-manager' flink jobmanager

docker service create --name taskmanager --env 
JOB_MANAGER_RPC_ADDRESS={{PUBLIC_IP}}  -p 6121:6121 -p 6122:6122  --network 
overlay --constraint 'node.hostname != ubuntu-swarm-manager' flink taskmanager

However, I am still encountering errors further down the line. When I submit a 
job using the Web UI, it fails because the JobManager can't talk to the 
TaskManager on port 35033. I presume this is the taskmanager.data.port, which 
needs to be set to a range and this range exposed when I create the service?

Are there any other ports that I need to open at service creation time?


Connecting the channel failed: Connecting to remote task manager + 
'/{{IP_ADDRESS_OF_MANAGER}}:35033' has failed. This might indicate that the 
remote task manager has been lost.
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
at 
org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:112)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:433)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:455)
at 
org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:91)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)




From: Piotr Nowojski 
Sent: 02 November 2017 14:26:32
To: Vergilio, Thalita
Cc: user@flink.apache.org
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if 
they are on different nodes

Did you try to expose required ports that are listed in the README when 
starting the containers?

https://github.com/apache/flink/tree/master/flink-contrib/docker-flink

Ports:
• The Web Client is on port 48081
• JobManager RPC port 6123 (default, not exposed to host)
• TaskManagers RPC port 6122 (default, not exposed to host)
• TaskManagers Data port 6121 (default, not exposed to host)

Piotrek

On 2 Nov 2017, at 14:44, javalass 
mailto:t.vergilio4...@student.leedsbeckett.ac.uk>>
 wrote:

I am using the Docker-Flink project in:
https://github.com/apache/flink/tree/master/flink-contrib/docker-flink

I am creating the services with the following commands:
docker network create -d overlay overlay
docker service create --name jobmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager -p 8081:8081 --network overlay
--constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager
docker service create --name taskmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager --network overlay --constraint
'node.hostname != ubuntu-swarm-manager' flink taskmanager

I wonder if there's any configuration I'm missing. This is the error I get:
- Trying to register at JobManager akka.tcp://flink@jobmanager:6123/
user/jobmanager (attempt 4, timeout: 4000 milliseconds)







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

To view the terms under which this email is distributed, please go to:-
http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html


Re: Negative values using latency marker

2017-11-03 Thread Nico Kruber
Hi Tovi,
if I see this correctly, the LatencyMarker gets its initial timstamp during 
creation at the source and the latency is reported as a metric at a sink by 
comparing the initial timestamp with the current time.
If the clocks between the two machines involved diverge, e.g. the sinks clock 
falling behind, the difference may be negative.


Nico

On Thursday, 2 November 2017 17:58:51 CET Sofer, Tovi  wrote:
> Hi group,
> 
> Can someone maybe elaborate how can latency gauge shown by latency marker be
> negative?
> 
> 2017-11-02 18:54:56,842 INFO 
> com.citi.artemis.flink.reporters.ArtemisReporter - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.Sink: FinalSink.0.latency: {LatencySourceDescriptor{vertexID=1,
> subtaskIndex=0}={p99=-5.0, p50=-5.0, min=-5.0, max=-5.0, p95=-5.0,
> mean=-5.0}, LatencySourceDescriptor{vertexID=1, subtaskIndex=1}={p99=-5.0,
> p50=-5.0, min=-5.0, max=-5.0, p95=-5.0, mean=-5.0},
> LatencySourceDescriptor{vertexID=1, subtaskIndex=2}={p99=-6.0, p50=-6.0,
> min=-6.0, max=-6.0, p95=-6.0, mean=-6.0},
> LatencySourceDescriptor{vertexID=1, subtaskIndex=3}={p99=-6.0, p50=-6.0,
> min=-6.0, max=-6.0, p95=-6.0, mean=-6.0}} 2017-11-02 18:54:56,843 INFO 
> com.citi.artemis.flink.reporters.ArtemisReporter - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.AverageE2ELatencyChecker.0.60SecWarmUpRecordsCounter: 2858446
> 2017-11-02 18:54:56,843 INFO 
> com.citi.artemis.flink.reporters.ArtemisReporter - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.Source: fixTopicConsumerSource.3.numRecordsOut: 1954784 2017-11-02
> 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter -
> [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.AverageE2ELatencyChecker.0.ActualRecordsCounter: 4962675 2017-11-02
> 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter -
> [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.AverageE2ELatencyChecker.0.AverageLatencyMs: 0.0753785 2017-11-02
> 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter -
> [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.AverageE2ELatencyChecker.0.HighLatencyMsgPercentage: 0.5918576
> 2017-11-02 18:54:56,843 INFO 
> com.citi.artemis.flink.reporters.ArtemisReporter - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.Source: fixTopicConsumerSource.0.numRecordsOutPerSecond:
> 12943.1167 2017-11-02 18:54:56,843 INFO 
> com.citi.artemis.flink.reporters.ArtemisReporter - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.AverageE2ELatencyChecker.0.numRecordsInPerSecond: 51751.4 2017-11-02
> 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter -
> [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.Source: fixTopicConsumerSource.3.numRecordsOutPerSecond: 12935.05
> 2017-11-02 18:54:56,843 INFO 
> com.citi.artemis.flink.reporters.ArtemisReporter - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.Source: fixTopicConsumerSource.2.numRecordsOutPerSecond:
> 12946.9166 2017-11-02 18:54:56,843 INFO 
> com.citi.artemis.flink.reporters.ArtemisReporter - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.Source: fixTopicConsumerSource.1.numRecordsOutPerSecond:
> 12926.3168 2017-11-02 18:54:56,844 INFO 
> com.citi.artemis.flink.reporters.ArtemisReporter - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.AverageE2ELatencyChecker.0.LatencyHistogram: count:1 min:24753
> max:19199891 mean:77637.6484 stddev:341333.9414842662 p50:40752.0
> p75:49809.0 p95:190480.95 p98:539110.819994 p99:749224.889995
> p999:3817927.9259998496
> 
> Regards,
> Tovi



signature.asc
Description: This is a digitally signed message part.


FlinkCEP, circular references and checkpointing failures

2017-11-03 Thread Federico D'Ambrosio
Hello everyone,

I'm a bit experimenting with FlinkCEP and I'm noticing weird failures when
it comes to checkpoints and within clauses windows closing at the same time
a (synchronous, both on Fs and RocksDB, stored in hdfs) checkpoint occurs.

The following is the relevant code:

val env : StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(6) //Checkpoints every minute
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setStateBackend(new FsStateBackend("hdfs:///flink/checkpoints-dir"))

//Pattern
val pattern =
  Pattern

.begin[EventWithId]("flying").oneOrMore.where(_.event.instantValues.altitude
>= 37000)
.notNext("disappearing").where(_.event.instantValues.altitude >=
37000).within(Time.minutes(1))

// Associate KeyedStream with pattern to be detected
val patternStream  = CEP.pattern(streamById, pattern)

which causes failure on the second checkpoint with the following exception
stack trace:

AsynchronousException{java.lang.Exception: Could not materialize checkpoint
2 fo   r operator
KeyedCEPPatternOperator -> alert-select -> Sink:
notification-sink-1
(1/1).}
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRu
nnable.run(StreamTask.java:970)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:51
1)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.
java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not materialize checkpoint 2 for
operator
KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1 (1/1).
... 6 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.IllegalStateExcept
ion: Could not find id for entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"
YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-G
OJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-
11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
150971668500   0, 0),
[SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:4
3)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRu
nnable.run(StreamTask.java:897)
... 5 more
Suppressed: java.lang.Exception: Could not properly cancel managed
keyedstate future.
at
org.apache.flink.streaming.api.operators.OperatorSnapshotResu
lt.cancel(OperatorSnapshotResult.java:90)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChec
kpointRunnable.cleanup(StreamTask.java:1023)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChec
kpointRunnable.run(StreamTask.java:961)
... 5 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.IllegalSta
teException: Could not find id for entry:
SharedBufferEntry(ValueTimeWrapper({"o
rigin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registrati
on":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time
":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
1509   716685000, 0),
[SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
at
java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUti
l.java:43)
at
org.apache.flink.runtime.state.StateUtil.discardStateFuture(S
tateUtil.java:85)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotResu
lt.cancel(OperatorSnapshotResult.java:88)
... 7 more
Caused by: java.lang.IllegalStateException: Could not find id for
entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight"
:"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":
370,"altitude":38000,"course":287,"time":"2017-11-03
13:44:45.000","lat":47.9129
,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0),
[SharedBufferEdge(null,
5)   ,
SharedBufferEdge(null, 6)], 1)
at
org.apache.flink.util.Preconditions.checkState(Preconditions.
java:195)
at
org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.

Re: FlinkCEP, circular references and checkpointing failures

2017-11-03 Thread Federico D'Ambrosio
I'm sorry, I realized that the stacktrack was poorly formatted, here it is
a better formatting:

AsynchronousException{java.lang.Exception: Could not materialize checkpoint
2 for operator KeyedCEPPatternOperator -> alert-select -> Sink:
notification-sink-1 (1/1).}
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not materialize checkpoint 2 for
operatorKeyedCEPPatternOperator -> alert-select -> Sink:
notification-sink-1 (1/1).
... 6 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.IllegalStateException: Could not find id for entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03
13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)],
1)
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
... 5 more
Suppressed: java.lang.Exception: Could not properly cancel managed
keyed state future.
at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
... 5 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.IllegalStateException: Could not find id for entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03
13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)],
1)
at
java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
at
org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
... 7 more
Caused by: java.lang.IllegalStateException: Could not find id for
entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03
13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)],
1)
at
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at
org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:971)
at
org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:838)
at
org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:928)
at
org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:852)
at
org.apache.flink.runtime.state.heap.NestedMapsStateTable$NestedMapsStateTableSnapshot.writeMappingsInKeyGroup(NestedMapsStateTable.java:355)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:347)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329)
at
org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:372)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:397)
   

Re: FlinkCEP, circular references and checkpointing failures

2017-11-03 Thread Kostas Kloudas
Hi Federico,

I assume that you are using Flink 1.3, right?

In this case, in 1.4 we have fixed a bug that seems similar to your case:
https://issues.apache.org/jira/browse/FLINK-7756 


Could you try the current master to see if it fixes your problem?

Thanks,
Kostas

> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio 
>  wrote:
> 
>  Could not find id for entry: 
>



Re: FlinkCEP, circular references and checkpointing failures

2017-11-03 Thread Federico D'Ambrosio
Hi Kostas,

yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.

2017-11-03 15:21 GMT+01:00 Kostas Kloudas :

> Hi Federico,
>
> I assume that you are using Flink 1.3, right?
>
> In this case, in 1.4 we have fixed a bug that seems similar to your case:
> https://issues.apache.org/jira/browse/FLINK-7756
>
> Could you try the current master to see if it fixes your problem?
>
> Thanks,
> Kostas
>
> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <
> federico.dambro...@smartlab.ws> wrote:
>
>  Could not find id for entry:
>
>
>
>


-- 
Federico D'Ambrosio


Re: FlinkCEP, circular references and checkpointing failures

2017-11-03 Thread Kostas Kloudas
Perfect! thanks a lot!

Kostas

> On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio 
>  wrote:
> 
> Hi Kostas, 
> 
> yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.
> 
> 2017-11-03 15:21 GMT+01:00 Kostas Kloudas  >:
> Hi Federico,
> 
> I assume that you are using Flink 1.3, right?
> 
> In this case, in 1.4 we have fixed a bug that seems similar to your case:
> https://issues.apache.org/jira/browse/FLINK-7756 
> 
> 
> Could you try the current master to see if it fixes your problem?
> 
> Thanks,
> Kostas
> 
>> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio 
>> mailto:federico.dambro...@smartlab.ws>> 
>> wrote:
>> 
>>  Could not find id for entry:
>> 
> 
> 
> 
> 
> -- 
> Federico D'Ambrosio



Re: Making external calls from a FlinkKafkaPartitioner

2017-11-03 Thread Nico Kruber
Hi Ron,
imho your code should be fine (except for a potential visibility problem on the 
changes of the non-volatile partitionMap member, depending on your needs).

The #open() method should be called (once) for each sink initialization 
(according to the javadoc) and then you should be fine with the asynchronous 
updater thread.
I'm including Gordon (cc'd) just to be sure as he may know more.


Nico

On Friday, 3 November 2017 04:06:02 CET Ron Crocker wrote:
> We have a system where the Kafka partition a message should go into is a
> function of a value in the message. Often, it’s value % # partitions, but
> for some values it’s not - it’s a specified list of partitions that changes
> over time. Our “simple Java library” that produces messages for this system
> also has a background thread that periodically polls a HTTP endpoint (at a
> rate of 1/minute as its default) to refresh that list of special cases.
> 
> It’s easy to create a FlinkKafkaPartitioner that does the mod operation;
> what I’m not so sure about is how to get this polling operation into the
> partitioner. I’m about to try it the obvious way (create a background
> thread that polls the URL and updates the partition map), but I wonder if
> that’s actually going to cause a bunch of problems for the Flink runtime.
> 
> Here’s the code that I have right now:
> public class EventInsertPartitioner extends KafkaPartitioner String>> { private final String partitionerURL;
> private final long updateIntervalInMillis;
> private Map> partitionMap;
> private ScheduledExecutorService executor;
> 
> public EventInsertPartitioner(String partitionerURL, long
> updateIntervalInMillis) { this.partitionerURL = partitionerURL;
> this.updateIntervalInMillis = updateIntervalInMillis;
> this.partitionMap = new HashMap<>();
> }
> 
> @Override
> public void open(int parallelInstanceId, int parallelInstances, int[]
> partitions) { executor = Executors.newScheduledThreadPool(1);
> executor.scheduleAtFixedRate(
> () -> updatePartitionMapRunnable(),
> updateIntervalInMillis,
> updateIntervalInMillis,
> TimeUnit.MILLISECONDS);
> 
> }
> 
> private void updatePartitionMapRunnable() {
> // Make synchronous request to partitionerURL
> // This is a simple JSON that matches our data
> String response = "{1:[1,2,3],2:[2]}";
> // Replace current partitionMap with new HashMap from the response
> this.partitionMap = convertResponseToMap(response);
> // Replacing the current value of partitionMap with the updated
> version doesn't // require synchronization
> }
> 
> private Map> convertResponseToMap(String response) {
> Map> hashMap = new HashMap<>();
> // Convert response to JSON structure and just use that?
> // or Iterate and add to local hashMap
> return hashMap;
> }
> 
> @Override
> public int partition(Tuple2 next, byte[] serializedKey,
> byte[] serializedValue, int numPartitions) { long myKey = next.f0;
> 
> if (partitionMap.containsKey(myKey)) {
> List partitions = partitionMap.get(myKey);
> myKey =
> partitions.get(ThreadLocalRandom.current().nextInt(partitions.size())); }
> 
> return (int)(myKey % numPartitions);
> }
> }
> Ron
> —
> Ron Crocker
> Principal Engineer & Architect
> ( ( •)) New Relic
> rcroc...@newrelic.com
> M: +1 630 363 8835

signature.asc
Description: This is a digitally signed message part.


Re: Incremental checkpointing documentation

2017-11-03 Thread Nico Kruber
Hi Elias,
let me answer the questions to the best of my knowledge, but in general I 
think this is as expected.
(Let me give a link to the docs explaining the activation [1] for other 
readers first.)

On Friday, 3 November 2017 01:11:52 CET Elias Levy wrote:
> What is the interaction of incremental checkpointing and external
> checkpoints?

Externalized checkpoints may be incremental [2] (I'll fix the formatting error 
that is not rendering the arguments as a list, making them less visible)

> Any interaction with the state.checkpoints.num-retained config?

Yes, this remains the number of available checkpoints. There may, however, be 
more folders containing RocksDB state that was originally put into checkpoint 
X but is also still required in checkpoint X+10 or so. These files will be 
cleaned up once they are not needed anymore.

> Does incremental checkpointing require any maintenance?

No, state is cleaned up once it is not used/referenced anymore.

> Any interaction with savepoints?

No, a savepoint uses Flink's own data format and is not incremental [3].

> Does it perform better against certain "file systems"?  E.g. it S3 not
> recommended for it?  How about EFS?

I can't think of a reason this should be any different to non-incremental 
checkpoints. Maybe Stefan (cc'd) has some more info on this.

For more details on the whole topic, I can recommend Stefan's talk at the last 
Flink Forward [4] though.


Nico


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/
large_state_tuning.html#tuning-rocksdb
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/
checkpoints.html#difference-to-savepoints
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/
savepoints.html
[4] https://www.youtube.com/watch?
v=dWQ24wERItM&index=36&list=PLDX4T_cnKjD0JeULl1X6iTn7VIkDeYX_X

signature.asc
Description: This is a digitally signed message part.


Re: Using Flink Ml with DataStream

2017-11-03 Thread Adarsh Jain
Hi Chesnay,

Thanks for the reply, do you know how to serve using the trained model?

Where is the model saved?

Regards,
Adarsh



‌

On Wed, Nov 1, 2017 at 4:46 PM, Chesnay Schepler  wrote:

> I don't believe this to be possible. The ML library works exclusively with
> the Batch API.
>
>
> On 30.10.2017 12:52, Adarsh Jain wrote:
>
>
> Hi,
>
> Is there a way to use Stochastic Outlier Selection (SOS) and/or SVM using
> CoCoA with streaming data.
>
> Please suggest and give pointers.
>
> Regards,
> Adarsh
>
> ‌
>
>
>


Re: FlinkCEP, circular references and checkpointing failures

2017-11-03 Thread Federico D'Ambrosio
Hi Kostas,

I just tried running the same job with 1.4-SNAPSHOT for 10 minutes and it
didn't crash, so that was the same underlying issue of the JIRA you linked.

Do you happen to know when it's expected the 1.4 stable release?

Thank you very much,
Federico

2017-11-03 15:25 GMT+01:00 Kostas Kloudas :

> Perfect! thanks a lot!
>
> Kostas
>
> On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <
> federico.dambro...@smartlab.ws> wrote:
>
> Hi Kostas,
>
> yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.
>
> 2017-11-03 15:21 GMT+01:00 Kostas Kloudas :
>
>> Hi Federico,
>>
>> I assume that you are using Flink 1.3, right?
>>
>> In this case, in 1.4 we have fixed a bug that seems similar to your case:
>> https://issues.apache.org/jira/browse/FLINK-7756
>>
>> Could you try the current master to see if it fixes your problem?
>>
>> Thanks,
>> Kostas
>>
>> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <
>> federico.dambro...@smartlab.ws> wrote:
>>
>>  Could not find id for entry:
>>
>>
>>
>>
>
>
> --
> Federico D'Ambrosio
>
>
>


-- 
Federico D'Ambrosio


Re: FlinkCEP, circular references and checkpointing failures

2017-11-03 Thread Kostas Kloudas
Hi Federico,

Thanks for trying it out! 
Great to hear that your problem was fixed!

The feature freeze for the release is going to be next week, and I would expect 
1 or 2 more weeks testing.
So I would say in 2.5 weeks. But this is of course subject to potential issues 
we may find during testing.

Cheers,
Kostas

> On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio 
>  wrote:
> 
> Hi Kostas,
> 
> I just tried running the same job with 1.4-SNAPSHOT for 10 minutes and it 
> didn't crash, so that was the same underlying issue of the JIRA you linked.
> 
> Do you happen to know when it's expected the 1.4 stable release?
> 
> Thank you very much,
> Federico
> 
> 2017-11-03 15:25 GMT+01:00 Kostas Kloudas  >:
> Perfect! thanks a lot!
> 
> Kostas
> 
>> On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio 
>> mailto:federico.dambro...@smartlab.ws>> 
>> wrote:
>> 
>> Hi Kostas, 
>> 
>> yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.
>> 
>> 2017-11-03 15:21 GMT+01:00 Kostas Kloudas > >:
>> Hi Federico,
>> 
>> I assume that you are using Flink 1.3, right?
>> 
>> In this case, in 1.4 we have fixed a bug that seems similar to your case:
>> https://issues.apache.org/jira/browse/FLINK-7756 
>> 
>> 
>> Could you try the current master to see if it fixes your problem?
>> 
>> Thanks,
>> Kostas
>> 
>>> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio 
>>> mailto:federico.dambro...@smartlab.ws>> 
>>> wrote:
>>> 
>>>  Could not find id for entry:   
>>>  
>> 
>> 
>> 
>> 
>> -- 
>> Federico D'Ambrosio
> 
> 
> 
> 
> -- 
> Federico D'Ambrosio



Re: Making external calls from a FlinkKafkaPartitioner

2017-11-03 Thread Ron Crocker
Thanks Nico -

Thanks for the feedback, and nice catch on the missing volatile. 

Ron
—
Ron Crocker
Principal Engineer & Architect
( ( •)) New Relic
rcroc...@newrelic.com
M: +1 630 363 8835

> On Nov 3, 2017, at 7:48 AM, Nico Kruber  wrote:
> 
> Hi Ron,
> imho your code should be fine (except for a potential visibility problem on 
> the 
> changes of the non-volatile partitionMap member, depending on your needs).
> 
> The #open() method should be called (once) for each sink initialization 
> (according to the javadoc) and then you should be fine with the asynchronous 
> updater thread.
> I'm including Gordon (cc'd) just to be sure as he may know more.
> 
> 
> Nico
> 
> On Friday, 3 November 2017 04:06:02 CET Ron Crocker wrote:
>> We have a system where the Kafka partition a message should go into is a
>> function of a value in the message. Often, it’s value % # partitions, but
>> for some values it’s not - it’s a specified list of partitions that changes
>> over time. Our “simple Java library” that produces messages for this system
>> also has a background thread that periodically polls a HTTP endpoint (at a
>> rate of 1/minute as its default) to refresh that list of special cases.
>> 
>> It’s easy to create a FlinkKafkaPartitioner that does the mod operation;
>> what I’m not so sure about is how to get this polling operation into the
>> partitioner. I’m about to try it the obvious way (create a background
>> thread that polls the URL and updates the partition map), but I wonder if
>> that’s actually going to cause a bunch of problems for the Flink runtime.
>> 
>> Here’s the code that I have right now:
>> public class EventInsertPartitioner extends KafkaPartitioner> String>> { private final String partitionerURL;
>>private final long updateIntervalInMillis;
>>private Map> partitionMap;
>>private ScheduledExecutorService executor;
>> 
>>public EventInsertPartitioner(String partitionerURL, long
>> updateIntervalInMillis) { this.partitionerURL = partitionerURL;
>>this.updateIntervalInMillis = updateIntervalInMillis;
>>this.partitionMap = new HashMap<>();
>>}
>> 
>>@Override
>>public void open(int parallelInstanceId, int parallelInstances, int[]
>> partitions) { executor = Executors.newScheduledThreadPool(1);
>>executor.scheduleAtFixedRate(
>>() -> updatePartitionMapRunnable(),
>>updateIntervalInMillis,
>>updateIntervalInMillis,
>>TimeUnit.MILLISECONDS);
>> 
>>}
>> 
>>private void updatePartitionMapRunnable() {
>>// Make synchronous request to partitionerURL
>>// This is a simple JSON that matches our data
>>String response = "{1:[1,2,3],2:[2]}";
>>// Replace current partitionMap with new HashMap from the response
>>this.partitionMap = convertResponseToMap(response);
>>// Replacing the current value of partitionMap with the updated
>> version doesn't // require synchronization
>>}
>> 
>>private Map> convertResponseToMap(String response) {
>> Map> hashMap = new HashMap<>();
>>// Convert response to JSON structure and just use that?
>>// or Iterate and add to local hashMap
>>return hashMap;
>>}
>> 
>>@Override
>>public int partition(Tuple2 next, byte[] serializedKey,
>> byte[] serializedValue, int numPartitions) { long myKey = next.f0;
>> 
>>if (partitionMap.containsKey(myKey)) {
>>List partitions = partitionMap.get(myKey);
>>myKey =
>> partitions.get(ThreadLocalRandom.current().nextInt(partitions.size())); }
>> 
>>return (int)(myKey % numPartitions);
>>}
>> }
>> Ron
>> —
>> Ron Crocker
>> Principal Engineer & Architect
>> ( ( •)) New Relic
>> rcroc...@newrelic.com
>> M: +1 630 363 8835



Re: Initialise side input state

2017-11-03 Thread Xingcan Cui
Hi Maxim,

thanks for the explanation. I think you can set a ValueState and a
ListState for the price and the purchase events, separately. On one hand,
when receiving a purchase event, you first check the price state. If it
exists, you just collect the PurchaseTotal result; otherwise you can
temporarily cache the event into the ListState. On the other hand, when
receiving a price event, you first update the state and check if there
exist some cached purchase events that need to be processed.

You can set a boolean flag in the function to avoid checking the purchase
state every time (since it takes extra time). Don't worry about the state
distribution problem. Flink will automatically divide them according to the
keys (productId in your example). For more information about the state, you
can refer to this document

.

Hope that helps.

Best,
Xingcan

On Fri, Nov 3, 2017 at 2:11 PM, Maxim Parkachov 
wrote:

> Hi Xingcan,
>
> On Fri, Nov 3, 2017 at 3:38 AM, Xingcan Cui  wrote:
>
>> Hi Maxim,
>>
>> if I understand correctly, you actually need to JOIN the fast stream with
>> the slow stream. Could you please share more details about your problem?
>>
>
> Sure I can explain more, with some example of pseudo-code. I have external
> DB with price list with following structure:
>
> case class PriceList(productId, price)
>
> My events are purchase events with following structure:
>
> case class Purchase(productId, amount)
>
> I would like to get final stream with TotalAmount = Amount*Price in
> structure like this:
>
> case class PurchaseTotal(productId, totalAmount)
>
> I have 2 corresponding input streams:
>
> val prices = env.addSource(new PriceListSource).keyBy(_.productId)
> val purchases = env.addSource(new PurchaseSource).keyBy(_.productId)
>
> PriceListSource delivers me all CHANGES to external DB table.
>
> Calculate function looks similar to:
>
> class CalculateFunction extends CoProcessFunction[Purchase, PriceList,
> PurchaseTotal] {
>
>   private var price: ValueState[Int] = _
>
>   override def processElement1... {
> out.collect(PurchaseTotal(purchase.productId, purchase.amount *
> priceList.value))
>   }
>
>   override def processElement2... {
> price.update(priceList.value)
>   }
> }
>
> And finally pipeline:
>
> purchases.connect(prices).process(new CalculateFunction).print
>
> The issue is, when I start program my price ValueState is empty and will
> not be populated with data which is not updated in DB.
> BTW, I cannot use AsyncIO to query DB, because of several technical
> restrictions.
>
> 1. When you mentioned "they have the same key", did you mean all the data
>> get the same key or the logic should be applied with fast.key = slow.key?
>>
>
> I meant here that productId in purchase event is definitely exist in
> external price list DB (so, it is kind of inner join)
>
>
>> 2. What should be done to initialize the state?
>>
>
> I need to read external DB table and populate price ValueState before
> processing first purchase event.
>
> Hope this minimal example helps to understand.
> Maxim.
>
>
>>
>> Best,
>> Xingcan
>>
>>
>> On Fri, Nov 3, 2017 at 5:54 AM, Maxim Parkachov 
>> wrote:
>>
>>> Hi Flink users,
>>>
>>> I'm struggling with some basic concept and would appreciate some help.
>>> I have 2 Input streams, one is fast event stream and one is slow changing
>>> dimension. They have the same key and I use CoProcessFunction to store
>>> slow data in state and enrich fast data from this state. Everything
>>> works as expected.
>>>
>>> Before I start processing fast streams on first run, I would like to 
>>> completely
>>> initialise state. I though it could be done in open(), but I don't
>>> understand how it will be re-distributed across parallel operators.
>>>
>>> Another alternative would be to create custom source and push all slow 
>>> dimension
>>> data downstream, but I could not find how to hold processing fast data
>>> until state is initialised.
>>>
>>> I realise that FLIP-17 (Side Inputs) is what I need, but is there some other
>>> way to implement it now ?
>>>
>>> Thanks,
>>> Maxim.
>>>
>>>
>>
>
>


Re: FlinkCEP, circular references and checkpointing failures

2017-11-03 Thread Federico D'Ambrosio
Thank you very much for your steady response, Kostas!

Cheers,
Federico

2017-11-03 16:26 GMT+01:00 Kostas Kloudas :

> Hi Federico,
>
> Thanks for trying it out!
> Great to hear that your problem was fixed!
>
> The feature freeze for the release is going to be next week, and I would
> expect 1 or 2 more weeks testing.
> So I would say in 2.5 weeks. But this is of course subject to potential
> issues we may find during testing.
>
> Cheers,
> Kostas
>
> On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <
> federico.dambro...@smartlab.ws> wrote:
>
> Hi Kostas,
>
> I just tried running the same job with 1.4-SNAPSHOT for 10 minutes and it
> didn't crash, so that was the same underlying issue of the JIRA you linked.
>
> Do you happen to know when it's expected the 1.4 stable release?
>
> Thank you very much,
> Federico
>
> 2017-11-03 15:25 GMT+01:00 Kostas Kloudas :
>
>> Perfect! thanks a lot!
>>
>> Kostas
>>
>> On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <
>> federico.dambro...@smartlab.ws> wrote:
>>
>> Hi Kostas,
>>
>> yes, I'm using 1.3.2. I'll try the current master and I'll get back to
>> you.
>>
>> 2017-11-03 15:21 GMT+01:00 Kostas Kloudas :
>>
>>> Hi Federico,
>>>
>>> I assume that you are using Flink 1.3, right?
>>>
>>> In this case, in 1.4 we have fixed a bug that seems similar to your case:
>>> https://issues.apache.org/jira/browse/FLINK-7756
>>>
>>> Could you try the current master to see if it fixes your problem?
>>>
>>> Thanks,
>>> Kostas
>>>
>>> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <
>>> federico.dambro...@smartlab.ws> wrote:
>>>
>>>  Could not find id for entry:
>>>
>>>
>>>
>>>
>>
>>
>> --
>> Federico D'Ambrosio
>>
>>
>>
>
>
> --
> Federico D'Ambrosio
>
>
>


-- 
Federico D'Ambrosio


Flink memory usage

2017-11-03 Thread AndreaKinn
Hi,
I would like to share some considerations about Flink memory consumption.
I have a cluster composed of three nodes: 1 used both as JM and TM and other
2 TM.

I ran two identical applications (in different moments) on it. The only
difference is that on the second one I doubled every operators, essentially
to check what changes in resource's usage.

Analysing the outcomes on cpu side effectively the efforts are doubled.
Doing the same with memory I had these results:


 

which to me seems completely counterintuitive since the results are
essentially equal.
I can imagine in the second case the memory was effectively almost full but
why Flink gets such a lot of memory even in the first case?
How it is explained this behaviour?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/