RE: Flink on Mesos: containers question

2018-07-30 Thread NEKRASSOV, ALEXEI
Renjie,

In my observation Task Managers don’t run in Docker containers – they run as 
JVM processes directly on the VM.
The only Docker container is the one that runs Job Manager.

What am I missing?

Thanks,
Alex

From: Renjie Liu [mailto:liurenjie2...@gmail.com]
Sent: Friday, July 20, 2018 8:56 PM
To: Till Rohrmann 
Cc: NEKRASSOV, ALEXEI ; Fabian Hueske ; user 

Subject: Re: Flink on Mesos: containers question

Hi, Alexei:

What you paste is expected behavior. Jobmanager, two task managers each should 
run in a docker instance.

13276 is should be the process of job manager, and it's the same process as 
789. They have different processes id because in show them in different 
namesapces(that's a concept in cgroup, which docker actually dependens on).

On Thu, Jul 19, 2018 at 10:00 PM Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:
Hi Alexei,

I actually never used Mesos with container images. I always used it in a way 
where the Mesos task directly starts the Java process.

Cheers,
Till

On Thu, Jul 19, 2018 at 2:44 PM NEKRASSOV, ALEXEI 
mailto:an4...@att.com>> wrote:
Till,

Any insight into how Flink components are containerized in Mesos?

Thanks!
Alex

From: Fabian Hueske [mailto:fhue...@gmail.com<mailto:fhue...@gmail.com>]
Sent: Monday, July 16, 2018 7:57 AM
To: NEKRASSOV, ALEXEI mailto:an4...@att.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>; Till Rohrmann 
mailto:trohrm...@apache.org>>
Subject: Re: Flink on Mesos: containers question

Hi Alexei,

Till (in CC) is familiar with Flink's Mesos support in 1.4.x.

Best, Fabian

2018-07-13 15:07 GMT+02:00 NEKRASSOV, ALEXEI 
mailto:an4...@att.com>>:
Can someone please clarify how Flink on Mesos in containerized?

On 5-node Mesos cluster I started Flink (1.4.2) with two Task Managers. Mesos 
shows “flink” task and two “taskmanager” tasks, all on the same VM.
On that VM I see one Docker container running a process that seems to be Mesos 
App Master:

$ docker ps -a
CONTAINER IDIMAGE COMMAND  
CREATED STATUS  PORTS   NAMES
97b6840466c0mesosphere/dcos-flink:1.4.2-1.0   "/bin/sh -c /sbin/..."   
41 hours agoUp 41 hours 
mesos-a0079d85-9ccb-4c43-8d31-e6b1ad750197
$ docker exec 97b6840466c0 /bin/ps -efww
UIDPID  PPID  C STIME TTY  TIME CMD
root 1 0  0 Jul11 ?00:00:00 /bin/sh -c /sbin/init.sh
root 7 1  0 Jul11 ?00:00:02 runsvdir -P /etc/service
root 8 7  0 Jul11 ?00:00:00 runsv flink
root   629 0  0 Jul12 pts/000:00:00 /bin/bash
root   789 8  1 Jul12 ?00:09:16 
/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -classpath 
/flink-1.4.2/lib/flink-python_2.11-1.4.2.jar:/flink-1.4.2/lib/flink-shaded-hadoop2-uber-1.4.2.jar:/flink-1.4.2/lib/log4j-1.2.17.jar:/flink-1.4.2/lib/slf4j-log4j12-1.7.7.jar:/flink-1.4.2/lib/flink-dist_2.11-1.4.2.jar::/etc/hadoop/conf/:
 
-Dlog.file=/mnt/mesos/sandbox/flink--mesos-appmaster-alex-tfc87d-private-agents-3.novalocal.log
 -Dlog4j.configuration=file:/flink-1.4.2/conf/log4j.properties 
-Dlogback.configurationFile=file:/flink-1.4.2/conf/logback.xml 
org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner 
-Dblob.server.port=23170 -Djobmanager.heap.mb=256 -Djobmanager.rpc.port=23169 
-Djobmanager.web.port=23168 -Dmesos.artifact-server.port=23171 
-Dmesos.initial-tasks=2 -Dmesos.resourcemanager.tasks.cpus=2 
-Dmesos.resourcemanager.tasks.mem=2048 -Dtaskmanager.heap.mb=512 
-Dtaskmanager.memory.preallocate=true -Dtaskmanager.numberOfTaskSlots=1 
-Dparallelism.default=1 -Djobmanager.rpc.address=localhost 
-Dmesos.resourcemanager.framework.role=* 
-Dsecurity.kerberos.login.use-ticket-cache=true
root  1027 0  0 12:54 ?00:00:00 /bin/ps -efww

Then on the VM itself I see another process with the same command line as the 
one in the container:

root 13276  9689  1 Jul12 ?00:09:18 
/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -classpath 
/flink-1.4.2/lib/flink-python_2.11-1.4.2.jar:/flink-1.4.2/lib/flink-shaded-hadoop2-uber-1.4.2.jar:/flink-1.4.2/lib/log4j-1.2.17.jar:/flink-1.4.2/lib/slf4j-log4j12-1.7.7.jar:/flink-1.4.2/lib/flink-dist_2.11-1.4.2.jar::/etc/hadoop/conf/:
 
-Dlog.file=/mnt/mesos/sandbox/flink--mesos-appmaster-alex-tfc87d-private-agents-3.novalocal.log
 -Dlog4j.configuration=file:/flink-1.4.2/conf/log4j.properties 
-Dlogback.configurationFile=file:/flink-1.4.2/conf/logback.xml 
org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner 
-Dblob.server.port=23170 -Djobmanager.heap.mb=256 -Djobmanager.rpc.port=23169 
-Djobmanager.web.port=23168 -Dmesos.artifact-server.port=23171 
-Dmesos.initial-tasks=2 -Dmesos.resourcemanager.tasks.cpus=2 
-Dmesos.resourcemanager.tasks.mem=2048 -Dtaskmanager.heap.mb=512 
-Dtaskmanager.memory.preallocate=true -Dtask

RE: Flink on Mesos: containers question

2018-07-19 Thread NEKRASSOV, ALEXEI
Till,

Any insight into how Flink components are containerized in Mesos?

Thanks!
Alex

From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: Monday, July 16, 2018 7:57 AM
To: NEKRASSOV, ALEXEI 
Cc: user@flink.apache.org; Till Rohrmann 
Subject: Re: Flink on Mesos: containers question

Hi Alexei,

Till (in CC) is familiar with Flink's Mesos support in 1.4.x.

Best, Fabian

2018-07-13 15:07 GMT+02:00 NEKRASSOV, ALEXEI 
mailto:an4...@att.com>>:
Can someone please clarify how Flink on Mesos in containerized?

On 5-node Mesos cluster I started Flink (1.4.2) with two Task Managers. Mesos 
shows “flink” task and two “taskmanager” tasks, all on the same VM.
On that VM I see one Docker container running a process that seems to be Mesos 
App Master:

$ docker ps -a
CONTAINER IDIMAGE COMMAND  
CREATED STATUS  PORTS   NAMES
97b6840466c0mesosphere/dcos-flink:1.4.2-1.0   "/bin/sh -c /sbin/..."   
41 hours agoUp 41 hours 
mesos-a0079d85-9ccb-4c43-8d31-e6b1ad750197
$ docker exec 97b6840466c0 /bin/ps -efww
UIDPID  PPID  C STIME TTY  TIME CMD
root 1 0  0 Jul11 ?00:00:00 /bin/sh -c /sbin/init.sh
root 7 1  0 Jul11 ?00:00:02 runsvdir -P /etc/service
root 8 7  0 Jul11 ?00:00:00 runsv flink
root   629 0  0 Jul12 pts/000:00:00 /bin/bash
root   789 8  1 Jul12 ?00:09:16 
/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -classpath 
/flink-1.4.2/lib/flink-python_2.11-1.4.2.jar:/flink-1.4.2/lib/flink-shaded-hadoop2-uber-1.4.2.jar:/flink-1.4.2/lib/log4j-1.2.17.jar:/flink-1.4.2/lib/slf4j-log4j12-1.7.7.jar:/flink-1.4.2/lib/flink-dist_2.11-1.4.2.jar::/etc/hadoop/conf/:
 
-Dlog.file=/mnt/mesos/sandbox/flink--mesos-appmaster-alex-tfc87d-private-agents-3.novalocal.log
 -Dlog4j.configuration=file:/flink-1.4.2/conf/log4j.properties 
-Dlogback.configurationFile=file:/flink-1.4.2/conf/logback.xml 
org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner 
-Dblob.server.port=23170 -Djobmanager.heap.mb=256 -Djobmanager.rpc.port=23169 
-Djobmanager.web.port=23168 -Dmesos.artifact-server.port=23171 
-Dmesos.initial-tasks=2 -Dmesos.resourcemanager.tasks.cpus=2 
-Dmesos.resourcemanager.tasks.mem=2048 -Dtaskmanager.heap.mb=512 
-Dtaskmanager.memory.preallocate=true -Dtaskmanager.numberOfTaskSlots=1 
-Dparallelism.default=1 -Djobmanager.rpc.address=localhost 
-Dmesos.resourcemanager.framework.role=* 
-Dsecurity.kerberos.login.use-ticket-cache=true
root  1027 0  0 12:54 ?00:00:00 /bin/ps -efww

Then on the VM itself I see another process with the same command line as the 
one in the container:

root 13276  9689  1 Jul12 ?00:09:18 
/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -classpath 
/flink-1.4.2/lib/flink-python_2.11-1.4.2.jar:/flink-1.4.2/lib/flink-shaded-hadoop2-uber-1.4.2.jar:/flink-1.4.2/lib/log4j-1.2.17.jar:/flink-1.4.2/lib/slf4j-log4j12-1.7.7.jar:/flink-1.4.2/lib/flink-dist_2.11-1.4.2.jar::/etc/hadoop/conf/:
 
-Dlog.file=/mnt/mesos/sandbox/flink--mesos-appmaster-alex-tfc87d-private-agents-3.novalocal.log
 -Dlog4j.configuration=file:/flink-1.4.2/conf/log4j.properties 
-Dlogback.configurationFile=file:/flink-1.4.2/conf/logback.xml 
org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner 
-Dblob.server.port=23170 -Djobmanager.heap.mb=256 -Djobmanager.rpc.port=23169 
-Djobmanager.web.port=23168 -Dmesos.artifact-server.port=23171 
-Dmesos.initial-tasks=2 -Dmesos.resourcemanager.tasks.cpus=2 
-Dmesos.resourcemanager.tasks.mem=2048 -Dtaskmanager.heap.mb=512 
-Dtaskmanager.memory.preallocate=true -Dtaskmanager.numberOfTaskSlots=1 
-Dparallelism.default=1 -Djobmanager.rpc.address=localhost 
-Dmesos.resourcemanager.framework.role=* 
-Dsecurity.kerberos.login.use-ticket-cache=true

And I see two processes on the VM that seem to be related to Task Managers:

root 13688 13687  0 Jul12 ?00:04:25 /docker-java-home/jre/bin/java 
-Xms1448m -Xmx1448m -classpath 
/mnt/mesos/sandbox/flink/lib/flink-python_2.11-1.4.2.jar:/mnt/mesos/sandbox/flink/lib/flink-shaded-hadoop2-uber-1.4.2.jar:/mnt/mesos/sandbox/flink/lib/log4j-1.2.17.jar:/mnt/mesos/sandbox/flink/lib/slf4j-log4j12-1.7.7.jar:/mnt/mesos/sandbox/flink/lib/flink-dist_2.11-1.4.2.jar:::
 -Dlog.file=flink-taskmanager.log 
-Dlog4j.configuration=file:/mnt/mesos/sandbox/flink/conf/log4j.properties 
-Dlogback.configurationFile=file:/mnt/mesos/sandbox/flink/conf/logback.xml 
org.apache.flink.mesos.runtime.clusterframework.MesosTaskManager 
-Dblob.server.port=23170 -Dmesos.artifact-server.port=23171 
-Djobmanager.heap.mb=256 -Djobmanager.rpc.address=localhost 
-Djobmanager.web.port=23168 -Dsecurity.kerberos.login.use-ticket-cache=true 
-Djobmanager.rpc.port=23169 -Dtaskmanager.memory.preallocate=true 
-Dtaskmanager.rpc.port=1027 -Dmesos.initial-tasks=2 
-Dmesos.resourcemana

Flink on Mesos: containers question

2018-07-13 Thread NEKRASSOV, ALEXEI
Can someone please clarify how Flink on Mesos in containerized?

On 5-node Mesos cluster I started Flink (1.4.2) with two Task Managers. Mesos 
shows "flink" task and two "taskmanager" tasks, all on the same VM.
On that VM I see one Docker container running a process that seems to be Mesos 
App Master:

$ docker ps -a
CONTAINER IDIMAGE COMMAND  
CREATED STATUS  PORTS   NAMES
97b6840466c0mesosphere/dcos-flink:1.4.2-1.0   "/bin/sh -c /sbin/..."   
41 hours agoUp 41 hours 
mesos-a0079d85-9ccb-4c43-8d31-e6b1ad750197
$ docker exec 97b6840466c0 /bin/ps -efww
UIDPID  PPID  C STIME TTY  TIME CMD
root 1 0  0 Jul11 ?00:00:00 /bin/sh -c /sbin/init.sh
root 7 1  0 Jul11 ?00:00:02 runsvdir -P /etc/service
root 8 7  0 Jul11 ?00:00:00 runsv flink
root   629 0  0 Jul12 pts/000:00:00 /bin/bash
root   789 8  1 Jul12 ?00:09:16 
/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -classpath 
/flink-1.4.2/lib/flink-python_2.11-1.4.2.jar:/flink-1.4.2/lib/flink-shaded-hadoop2-uber-1.4.2.jar:/flink-1.4.2/lib/log4j-1.2.17.jar:/flink-1.4.2/lib/slf4j-log4j12-1.7.7.jar:/flink-1.4.2/lib/flink-dist_2.11-1.4.2.jar::/etc/hadoop/conf/:
 
-Dlog.file=/mnt/mesos/sandbox/flink--mesos-appmaster-alex-tfc87d-private-agents-3.novalocal.log
 -Dlog4j.configuration=file:/flink-1.4.2/conf/log4j.properties 
-Dlogback.configurationFile=file:/flink-1.4.2/conf/logback.xml 
org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner 
-Dblob.server.port=23170 -Djobmanager.heap.mb=256 -Djobmanager.rpc.port=23169 
-Djobmanager.web.port=23168 -Dmesos.artifact-server.port=23171 
-Dmesos.initial-tasks=2 -Dmesos.resourcemanager.tasks.cpus=2 
-Dmesos.resourcemanager.tasks.mem=2048 -Dtaskmanager.heap.mb=512 
-Dtaskmanager.memory.preallocate=true -Dtaskmanager.numberOfTaskSlots=1 
-Dparallelism.default=1 -Djobmanager.rpc.address=localhost 
-Dmesos.resourcemanager.framework.role=* 
-Dsecurity.kerberos.login.use-ticket-cache=true
root  1027 0  0 12:54 ?00:00:00 /bin/ps -efww

Then on the VM itself I see another process with the same command line as the 
one in the container:

root 13276  9689  1 Jul12 ?00:09:18 
/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -classpath 
/flink-1.4.2/lib/flink-python_2.11-1.4.2.jar:/flink-1.4.2/lib/flink-shaded-hadoop2-uber-1.4.2.jar:/flink-1.4.2/lib/log4j-1.2.17.jar:/flink-1.4.2/lib/slf4j-log4j12-1.7.7.jar:/flink-1.4.2/lib/flink-dist_2.11-1.4.2.jar::/etc/hadoop/conf/:
 
-Dlog.file=/mnt/mesos/sandbox/flink--mesos-appmaster-alex-tfc87d-private-agents-3.novalocal.log
 -Dlog4j.configuration=file:/flink-1.4.2/conf/log4j.properties 
-Dlogback.configurationFile=file:/flink-1.4.2/conf/logback.xml 
org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner 
-Dblob.server.port=23170 -Djobmanager.heap.mb=256 -Djobmanager.rpc.port=23169 
-Djobmanager.web.port=23168 -Dmesos.artifact-server.port=23171 
-Dmesos.initial-tasks=2 -Dmesos.resourcemanager.tasks.cpus=2 
-Dmesos.resourcemanager.tasks.mem=2048 -Dtaskmanager.heap.mb=512 
-Dtaskmanager.memory.preallocate=true -Dtaskmanager.numberOfTaskSlots=1 
-Dparallelism.default=1 -Djobmanager.rpc.address=localhost 
-Dmesos.resourcemanager.framework.role=* 
-Dsecurity.kerberos.login.use-ticket-cache=true

And I see two processes on the VM that seem to be related to Task Managers:

root 13688 13687  0 Jul12 ?00:04:25 /docker-java-home/jre/bin/java 
-Xms1448m -Xmx1448m -classpath 
/mnt/mesos/sandbox/flink/lib/flink-python_2.11-1.4.2.jar:/mnt/mesos/sandbox/flink/lib/flink-shaded-hadoop2-uber-1.4.2.jar:/mnt/mesos/sandbox/flink/lib/log4j-1.2.17.jar:/mnt/mesos/sandbox/flink/lib/slf4j-log4j12-1.7.7.jar:/mnt/mesos/sandbox/flink/lib/flink-dist_2.11-1.4.2.jar:::
 -Dlog.file=flink-taskmanager.log 
-Dlog4j.configuration=file:/mnt/mesos/sandbox/flink/conf/log4j.properties 
-Dlogback.configurationFile=file:/mnt/mesos/sandbox/flink/conf/logback.xml 
org.apache.flink.mesos.runtime.clusterframework.MesosTaskManager 
-Dblob.server.port=23170 -Dmesos.artifact-server.port=23171 
-Djobmanager.heap.mb=256 -Djobmanager.rpc.address=localhost 
-Djobmanager.web.port=23168 -Dsecurity.kerberos.login.use-ticket-cache=true 
-Djobmanager.rpc.port=23169 -Dtaskmanager.memory.preallocate=true 
-Dtaskmanager.rpc.port=1027 -Dmesos.initial-tasks=2 
-Dmesos.resourcemanager.tasks.cpus=2 -Dtaskmanager.maxRegistrationDuration=5 
minutes -Dtaskmanager.data.port=1028 -Dparallelism.default=1 
-Dtaskmanager.numberOfTaskSlots=1 -Dmesos.resourcemanager.tasks.mem=2048 
-Dtaskmanager.heap.mb=512 -Dmesos.resourcemanager.framework.role=*
root 13892 13891  0 Jul12 ?00:04:15 /docker-java-home/jre/bin/java 
-Xms1448m -Xmx1448m -classpath 

Checkpointing when reading from files?

2018-05-21 Thread NEKRASSOV, ALEXEI
I want to add checkpointing to my program that reads from a set of files in a 
directory. Without checkpointing I use readFile():

  DataStream text = env.readFile(
   new TextInputFormat(new Path(inputPath)),
   inputPath,
  inputProcessingMode,
  1000);

Should I use ContinuousFileMonitoringFunction / ContinuousFileReaderOperator to 
add checkpointing? Or is there an easier way?

How do I go from splits (that ContinuousFileMonitoringFunction provides) to 
actual strings? I'm not clear how ContinuousFileReaderOperator can be used.

  DataStreamSource split = env.addSource(
   new ContinuousFileMonitoringFunction(
 new TextInputFormat(new 
Path(inputPath)),
 inputProcessingMode,
 1,
 1000)
  );

Thanks,
Alex


Consolidated log for a job?

2018-05-14 Thread NEKRASSOV, ALEXEI
Is there a way to see logs from multiple Task Managers *all in one place* (for 
a running or a completed job)? Or I need to check logs on each Task Manager 
individually?

Thanks,
Alex Nekrassov



RE: RocksDBMapState example?

2018-04-10 Thread NEKRASSOV, ALEXEI
Yes, I've read the documentation on working with state.
It talks about MapState<UK, UV>. When I looked at Javadoc, I learned that 
MapState is an interface, with RocksDBMapState as one of the implementing 
classes.

I'm not sure what you mean by KeyedState; I don't see a class with that name.

I'm not clear how ValueState can be used to store key-value mapping. Can you 
please clarify?

Thanks,
Alex

-Original Message-
From: Dawid Wysakowicz [mailto:wysakowicz.da...@gmail.com] 
Sent: Tuesday, April 10, 2018 8:54 AM
To: NEKRASSOV, ALEXEI <an4...@att.com>
Cc: user@flink.apache.org
Subject: Re: RocksDBMapState example?

Hi Alexei,

You should not use RocksDBMapState directly. Have you went through the doc page 
regarding working with state[1]?
I think you want to use KeyedState, assuming the size of your keyspace. 
Probably a way to go would be to key your stream and then even ValueState 
(which will be scoped to that key) might be sufficient.
You can configure flink further to use RocksDB as the underlying state 
backend[2]

Regards,
Dawid

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#working-with-state
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/state_backends.html#state-backends

> On 9 Apr 2018, at 17:41, NEKRASSOV, ALEXEI <an4...@att.com> wrote:
> 
> Hi,
> 
> I’d like to use RocksDB to store a key-value mapping table (with 45 million 
> keys).
> Can someone please point me to an example of RocksDBMapState() constructor 
> invocation? Or an explanation of constructor arguments?..
> 
> Thanks,
> Alex Nekrassov
> nekras...@att.com



RE: RocksDBMapState example?

2018-04-10 Thread NEKRASSOV, ALEXEI
I looked at that code, but I’m still not clear.

new RocksDBMapState<>(columnFamily, namespaceSerializer, stateDesc, this);

columnFamily is determined by 50-line function; is this necessary for a simple 
use case like mine? What should I use as state descriptor in that function?..
Last argument is set to “this”; does this mean I need to implement 
AbstractKeyedStateBackend, like RocksDBKeyedStateBackend does?

Again, I’m looking for a simple equivalent to
new HashMap();

or to
JedisPool pool = new JedisPool(new JedisPoolConfig(), redisHost);
Jedis jedis = pool.getResource();

Thanks,
Alex

From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Monday, April 09, 2018 11:48 AM
To: user 
Subject: Re: RocksDBMapState example?

Hi,
Have you looked at the ctor call in :
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java

around line 1261 ?

Cheers


RocksDBMapState example?

2018-04-09 Thread NEKRASSOV, ALEXEI
Hi,

I'd like to use RocksDB to store a key-value mapping table (with 45 million 
keys).
Can someone please point me to an example of RocksDBMapState() constructor 
invocation? Or an explanation of constructor arguments?..

Thanks,
Alex Nekrassov
nekras...@att.com


RE: How does setMaxParallelism work

2018-03-29 Thread NEKRASSOV, ALEXEI
Is there an auto-scaling feature in Flink, where I start with parallelism of 
(for example) 1, but Flink notices I have high volume of data to process, and 
automatically increases parallelism of a running job?

Thanks,
Alex

-Original Message-
From: Nico Kruber [mailto:n...@data-artisans.com] 
Sent: Wednesday, March 28, 2018 8:54 AM
To: Data Engineer 
Cc: Jörn Franke ; user@flink.apache.org
Subject: Re: How does setMaxParallelism work

Flink does not decide the parallelism based on your job.
There is a default parallelism (configured via parallelism.default [1], by 
default 1) which is used if you do not specify it yourself.


Nico

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#common-options

On 28/03/18 13:21, Data Engineer wrote:
> Agreed. But how did Flink decide that it should allot 1 subtask? Why 
> not
> 2 or 3?
> I am trying to understand the implications of using setMaxParallelism 
> vs setParallelism
> 
> On Wed, Mar 28, 2018 at 2:58 PM, Nico Kruber  > wrote:
> 
> Hi James,
> the number of subtasks being used is defined by the parallelism, the max
> parallelism, however, "... determines the maximum parallelism to which
> you can scale operators" [1]. That is, once set, you cannot ever (even
> after restarting your program from a savepoint) increase the operator's
> parallelism above this value. The actual parallelism can be set per job
> in your program but also in the flink client:
> flink run -p   
> 
> 
> Nico
> 
> 
> 
> [1]
> 
> https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
> 
>  _ready.html#set-maximum-parallelism-for-operators-explicitly>
> 
> On 28/03/18 09:25, Data Engineer wrote:
> > I have a sample application that reads around 2 GB of csv files,
> > converts each record into Avro object and sends it to kafka.
> > I use a custom FileReader that reads the files in a directory.
> > I have set taskmanager.numberOfTaskSlots to 4.
> > I see that if I use setParallelism(3), 3 subtasks are created. But if I
> > use setMaxParallelism(3), only 1 subtask is created.
> >
> > On Wed, Mar 28, 2018 at 12:29 PM, Jörn Franke  
> > >> wrote:
> >
> >     What was the input format, the size and the program that you tried
> >     to execute
> >
> >     On 28. Mar 2018, at 08:18, Data Engineer  
> >      >> wrote:
> >
> >>     I went through the explanation on MaxParallelism in the official
> >>     docs here:
> >>     
> https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
> 
> 
> >>     
>  
> >
> >>
> >>     However, I am not able to figure out how Flink decides the
> >>     parallelism value.
> >>     For instance, if I setMaxParallelism to 3, I see that for my job,
> >>     there is only 1 subtask that is created. How did Flink decide that
> >>     1 subtask was enough?
> >>
> >>     Regards,
> >>     James
> >
> >
> 
> --
> Nico Kruber | Software Engineer
> data Artisans
> 
> Follow us @dataArtisans
> --
> Join Flink Forward - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
> --
> Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
> data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
> 
> 

--
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference Stream Processing | Event 
Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany data Artisans, 
Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. 
Kostas Tzoumas, Dr. Stephan Ewen



timeWindow emits records before window ends?

2018-03-27 Thread NEKRASSOV, ALEXEI
Hello,

With time characteristic set to IngestionTime I expected 
"timeWindow(Time.minutes(3))" to NOT produce any records in the first 3 minutes 
of running the job, and yet it does emit the record before 3 minutes elapse.
Am I doing something wrong? Or my understanding of timeWindow is incorrect?

For example, in Flink UI I see:

TriggerWindow(TumblingEventTimeWindows(18), 
AggregatingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@7c810ef9,
 aggFunction=nextgen.McdrAggregator@7d7758be}, EventTimeTrigger(), 
WindowedStream.aggregate(WindowedStream.java:752)) -> Map

With "duration" 42s and "records sent" 689516.

I expected no records would be sent out until 18 ms elapse.

Thanks,
Alex Nekrassov
nekras...@att.com



POJO default constructor - how is it used by Flink?

2018-03-09 Thread NEKRASSOV, ALEXEI
Hi,

I added a default constructor to the class that represents messages flowing 
through my Flink job graph - to satisfy Flink POJO requirements.
Although I don't call that default constructor explicitly, the logs show that 
it is called anyway.

Why is this happening?

In my test, for 77 incoming messages I see 77 calls to my "real" constructor, 
but also another 77 calls - to default constructor, that essentially create 
objects with no value.
Is this expected, or I'm doing something wrong?

Thanks,
Alex Nekrassov
nekras...@att.com