Re: Java types

2018-01-10 Thread Boris Lublinsky
More questions
In Scala my DataProcessor is defined as
class DataProcessorKeyed extends CoProcessFunction[WineRecord, ModelToServe, 
Double] with CheckpointedFunction {
And it is used as follows
val models = modelsStream.map(ModelToServe.fromByteArray(_))
  .flatMap(BadDataHandler[ModelToServe])
  .keyBy(_.dataType)
val data = dataStream.map(DataRecord.fromByteArray(_))
  .flatMap(BadDataHandler[WineRecord])
  .keyBy(_.dataType)

// Merge streams
data
  .connect(models)
  .process(DataProcessorKeyed())
When I am doing the same thing in Java
public class DataProcessorKeyed extends 
CoProcessFunction implements 
CheckpointedFunction{
Which I am using as follows
// Read data from streams
DataStream> models = modelsStream
.flatMap(new ModelDataConverter(), new 
TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, 
TypeInformation.of(ModelToServe.class)))
.keyBy(0);
DataStream> data = dataStream
.flatMap(new DataDataConverter(), new 
TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, 
TypeInformation.of(Winerecord.WineRecord.class)))
.keyBy(0);

// Merge streams
data
.connect(models)
.process(new DataProcessorKeyed());
I am getting an error

Error:(68, 17) java: no suitable method found for keyBy(int)
method 
org.apache.flink.streaming.api.scala.DataStream.keyBy(scala.collection.Seq)
 is not applicable
  (argument mismatch; int cannot be converted to 
scala.collection.Seq)
method 
org.apache.flink.streaming.api.scala.DataStream.keyBy(scala.Function1,K>,org.apache.flink.api.common.typeinfo.TypeInformation)
 is not applicable
  (cannot infer type-variable(s) K
(actual and formal argument lists differ in length))
So it assumes key/value pairs for the coprocessor

Why is such difference between APIs?

Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com
https://www.lightbend.com/

> On Jan 10, 2018, at 6:20 PM, Boris Lublinsky  
> wrote:
> 
> I am trying to covert Scala code (which works fine) to Java
> The sacral code is:
> // create a Kafka consumers
> // Data
> val dataConsumer = new FlinkKafkaConsumer010[Array[Byte]](
>   DATA_TOPIC,
>   new ByteArraySchema,
>   dataKafkaProps
> )
> 
> // Model
> val modelConsumer = new FlinkKafkaConsumer010[Array[Byte]](
>   MODELS_TOPIC,
>   new ByteArraySchema,
>   modelKafkaProps
> )
> 
> // Create input data streams
> val modelsStream = env.addSource(modelConsumer)
> val dataStream = env.addSource(dataConsumer)
> 
> // Read data from streams
> val models = modelsStream.map(ModelToServe.fromByteArray(_))
>   .flatMap(BadDataHandler[ModelToServe])
>   .keyBy(_.dataType)
> val data = dataStream.map(DataRecord.fromByteArray(_))
>   .flatMap(BadDataHandler[WineRecord])
>   .keyBy(_.dataType)
> Now I am trying to re write it to Java and fighting with the requirement of 
> providing types, where they should be obvious
> 
> // create a Kafka consumers
> // Data
> FlinkKafkaConsumer010 dataConsumer = new FlinkKafkaConsumer010<>(
> ModelServingConfiguration.DATA_TOPIC,
> new ByteArraySchema(),
> dataKafkaProps);
> 
> // Model
> FlinkKafkaConsumer010  modelConsumer = new FlinkKafkaConsumer010<>(
> ModelServingConfiguration.MODELS_TOPIC,
> new ByteArraySchema(),
> modelKafkaProps);
> 
> // Create input data streams
> DataStream modelsStream = env.addSource(modelConsumer, 
> PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
> DataStream dataStream = env.addSource(dataConsumer, 
> PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
> // Read data from streams
> DataStream> models = modelsStream
>  .flatMap(new ModelConverter(), new 
> TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, 
> TypeInformation.of(ModelToServe.class)));
> 
> Am I missing something similar to import org.apache.flink.api.scala._
>  In java?
> 
> Now if this is an only way, Does this seems right?
> 
> Boris Lublinsky
> FDP Architect
> boris.lublin...@lightbend.com 
> https://www.lightbend.com/
> 



Java types

2018-01-10 Thread Boris Lublinsky
I am trying to covert Scala code (which works fine) to Java
The sacral code is:
// create a Kafka consumers
// Data
val dataConsumer = new FlinkKafkaConsumer010[Array[Byte]](
  DATA_TOPIC,
  new ByteArraySchema,
  dataKafkaProps
)

// Model
val modelConsumer = new FlinkKafkaConsumer010[Array[Byte]](
  MODELS_TOPIC,
  new ByteArraySchema,
  modelKafkaProps
)

// Create input data streams
val modelsStream = env.addSource(modelConsumer)
val dataStream = env.addSource(dataConsumer)

// Read data from streams
val models = modelsStream.map(ModelToServe.fromByteArray(_))
  .flatMap(BadDataHandler[ModelToServe])
  .keyBy(_.dataType)
val data = dataStream.map(DataRecord.fromByteArray(_))
  .flatMap(BadDataHandler[WineRecord])
  .keyBy(_.dataType)
Now I am trying to re write it to Java and fighting with the requirement of 
providing types, where they should be obvious

// create a Kafka consumers
// Data
FlinkKafkaConsumer010 dataConsumer = new FlinkKafkaConsumer010<>(
ModelServingConfiguration.DATA_TOPIC,
new ByteArraySchema(),
dataKafkaProps);

// Model
FlinkKafkaConsumer010  modelConsumer = new FlinkKafkaConsumer010<>(
ModelServingConfiguration.MODELS_TOPIC,
new ByteArraySchema(),
modelKafkaProps);

// Create input data streams
DataStream modelsStream = env.addSource(modelConsumer, 
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
DataStream dataStream = env.addSource(dataConsumer, 
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
// Read data from streams
DataStream> models = modelsStream
 .flatMap(new ModelConverter(), new 
TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, 
TypeInformation.of(ModelToServe.class)));

Am I missing something similar to import org.apache.flink.api.scala._
 In java?

Now if this is an only way, Does this seems right?

Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com
https://www.lightbend.com/



Re: Anyone got Flink working in EMR with KinesisConnector

2018-01-10 Thread xiatao123
got the issue fixed after applying patch from
https://github.com/apache/flink/pull/4150



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Anyone got Flink working in EMR with KinesisConnector

2018-01-10 Thread Tao Xia
Hi All,
  I ran into an exception after deployed our app in EMR. It seems like the
connection to Kinesis failed. Any one got Flink KinesisConnector working in
EMR?

Release label:emr-5.11.0
Hadoop distribution:Amazon 2.7.3
Applications:Flink 1.3.2

java.lang.IllegalStateException: Socket not created by this factory
at org.apache.http.util.Asserts.check(Asserts.java:34)
at
org.apache.http.conn.ssl.SSLSocketFactory.isSecure(SSLSocketFactory.java:435)
at
org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:186)
at
org.apache.http.impl.conn.ManagedClientConnectionImpl.open(ManagedClientConnectionImpl.java:326)
at
org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:610)
at
org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:445)
at
org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:835)
at
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
at
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:837)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
at
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
at
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1940)
at
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1910)
at
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.describeStream(AmazonKinesisClient.java:656)
at
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.describeStream(KinesisProxy.java:361)
at
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardsOfStream(KinesisProxy.java:323)
at
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardList(KinesisProxy.java:231)
at
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.discoverNewShardsToSubscribe(KinesisDataFetcher.java:430)
at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:202)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)

Thanks,
Tao


Trigger not firing when using BoundedOutOfOrdernessTimestampExtractor

2018-01-10 Thread Jayant Ameta
Hi,
When using a BoundedOutOfOrdernessTimestampExtractor, the trigger is not
firing. However, the trigger fires when using custom timestamp extractor
with similar watermark.

Sample code below:
1.Assigner as anonymous class which works fine

AssignerWithPeriodicWatermarks> assigner = new
AssignerWithPeriodicWatermarks>() {

@Override
public long extractTimestamp(Tuple2 element, long
previousElementTimestamp) {
return System.currentTimeMillis();
}

@Override
public final Watermark getCurrentWatermark() {
// this guarantees that the watermark never goes backwards.
return new Watermark(System.currentTimeMillis()-100);
}
};


2.BoundedOutOfOrdernessTimestampExtractor assigner which doesn't work

AssignerWithPeriodicWatermarks> assigner = new
BoundedOutOfOrdernessTimestampExtractor>(Time.milliseconds(100)) {

@Override
public long extractTimestamp(Tuple2 element) {
return System.currentTimeMillis();
}
};


Do you see any difference in the approaches?

- Jayant


Re: BucketingSink broken in flink 1.4.0 ?

2018-01-10 Thread Kyle Hamlin
I'm having similar issues after moving from 1.3..2 to 1.4.0

*My mailing list thread: *BucketingSink doesn't work anymore moving from
1.3.2 to 1.4.0


I'm not actually using hdfs as my sink. I'll be using s3 as my final sink
but I get the following error even when I've given a local file path to the
BucketingSink.

java.lang.RuntimeException: Error while creating FileSystem when
initializing the state of the BucketingSink.
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Cannot instantiate file system for URI:
hdfs://localhost:12345/
at
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
... 9 more
Caused by: java.lang.ClassCastException






On Wed, Jan 10, 2018 at 1:39 PM Chesnay Schepler  wrote:

> Your analysis looks correct, the code in question will never properly
> detect hadoop file systems. I'll open a jira.
>
> Your suggestion to replace it with getUnguardedFileSystem() was my first
> instinct as well.
>
> Good job debugging this.
>
>
> On 10.01.2018 14:17, jelmer wrote:
>
> Hi I am trying to convert some jobs from flink 1.3.2 to flink 1.4.0
>
> But i am running into the issue that the bucketing sink will always try
> and connect to hdfs://localhost:12345/ instead of the hfds url i have
> specified in the constructor
>
> If i look at the code at
>
>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L1125
>
>
> It tries to create the hadoop filesystem like this
>
> final org.apache.flink.core.fs.FileSystem flinkFs =
> org.apache.flink.core.fs.FileSystem.get(path.toUri());
> final FileSystem hadoopFs = (flinkFs instanceof HadoopFileSystem) ?
> ((HadoopFileSystem) flinkFs).getHadoopFileSystem() : null;
>
> But FileSystem.getUnguardedFileSystem will always return a
>
>
> But FileSystem.get will always return a SafetyNetWrapperFileSystem so the
> instanceof check will never indicate that its a hadoop filesystem
>
>
> Am i missing something or is this a bug and if so what would be the
> correct fix ? I guess replacing FileSystem.get with
> FileSystem.getUnguardedFileSystem would fix it but I am afraid I lack the
> context to know if that would be safe
>
>
>


Re: Datastream broadcast with KeyBy

2018-01-10 Thread Fabian Hueske
Hi Anuj,

connecting a keyed stream and a broadcasted stream is not supported at the
moment but there is work in progress [1] to add this functionality for the
next release (Flink 1.5.0).

Best,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-3659

2018-01-10 12:21 GMT+01:00 Piotr Nowojski :

> Hi,
>
> Could you elaborate what is the problem that you are having? What is the
> exception(s) that you are getting? I have tested such simple example and
> it’s seems to be working as expected:
>
> DataStreamSource input = env.fromElements(1, 2, 3, 4, 5, 1, 2, 3);
>
> DataStreamSource confStream = env.fromElements(42);
>
> input.keyBy(new MyKeySelector()).connect(confStream.broadcast()).process(new 
> MyCoProcessFunction()).print();
>
>
> Thanks, Piotrek
>
> On 10 Jan 2018, at 10:01, anujk  wrote:
>
> Currently we have an Flink pipeline running with Data-Src —> KeyBy —>
> ProcessFunction.  State Management (with RocksDB) and Timers are working
> well.
> Now we have to extend this by having another Config Stream which we want to
> broadcast to all process operators. So wanted to connect the Data Stream
> with Config Stream (with Config Stream being broadcast) and use
> CoProcessFunction to handle both streams.
>
> KeyBy uses Hash based partitioning and also if we write CustomPartitioner
> it
> can return only one partition (Array of SelectedChannel option as in
> BroadcastPartitioner is not allowed).
> Would have liked this to work —
> dataStream.keyBy().connect(confStream.broadcast()).process(…
> RichCoProcessFunction()…)
> but it says both stream must be keyed.
>
> Is there any way to make this work?
>
> dataStream.connect(confStream.broadcast()).flatMap(...
> RichCoFlatMapFunction() …) ==> broadcast works. But we really want KeyBy
> and
> processFunction functionality.
>
> Thanks,
> Anuj
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>
>
>


Re: BucketingSink broken in flink 1.4.0 ?

2018-01-10 Thread Chesnay Schepler
Your analysis looks correct, the code in question will never properly 
detect hadoop file systems. I'll open a jira.


Your suggestion to replace it with getUnguardedFileSystem() was my first 
instinct as well.


Good job debugging this.

On 10.01.2018 14:17, jelmer wrote:

Hi I am trying to convert some jobs from flink 1.3.2 to flink 1.4.0

But i am running into the issue that the bucketing sink will always 
try and connect to hdfs://localhost:12345/ instead of the hfds url i 
have specified in the constructor


If i look at the code at

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L1125 




It tries to create the hadoop filesystem like this

final org.apache.flink.core.fs.FileSystem flinkFs = 
org.apache.flink.core.fs.FileSystem.get(path.toUri());

final FileSystem hadoopFs = (flinkFs instanceof HadoopFileSystem) ?
((HadoopFileSystem) flinkFs).getHadoopFileSystem() : null;

But FileSystem.getUnguardedFileSystem will always return a


But FileSystem.get will always return a SafetyNetWrapperFileSystem so 
the instanceof check will never indicate that its a hadoop filesystem



Am i missing something or is this a bug and if so what would be the 
correct fix ? I guess replacing FileSystem.get with 
FileSystem.getUnguardedFileSystem would fix it but I am afraid I lack 
the context to know if that would be safe





BucketingSink broken in flink 1.4.0 ?

2018-01-10 Thread jelmer
Hi I am trying to convert some jobs from flink 1.3.2 to flink 1.4.0

But i am running into the issue that the bucketing sink will always try and
connect to hdfs://localhost:12345/ instead of the hfds url i have specified
in the constructor

If i look at the code at

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L1125


It tries to create the hadoop filesystem like this

final org.apache.flink.core.fs.FileSystem flinkFs =
org.apache.flink.core.fs.FileSystem.get(path.toUri());
final FileSystem hadoopFs = (flinkFs instanceof HadoopFileSystem) ?
((HadoopFileSystem) flinkFs).getHadoopFileSystem() : null;

But FileSystem.getUnguardedFileSystem will always return a


But FileSystem.get will always return a SafetyNetWrapperFileSystem so the
instanceof check will never indicate that its a hadoop filesystem


Am i missing something or is this a bug and if so what would be the correct
fix ? I guess replacing FileSystem.get with
FileSystem.getUnguardedFileSystem would fix it but I am afraid I lack the
context to know if that would be safe


Re: Stream job failed after increasing number retained checkpoints

2018-01-10 Thread Piotr Nowojski
Hi,

This Task Manager log is suggesting that problems lays on the Job Manager side 
(no visible gap in the logs, GC Time reported is accumulated and 31 seconds 
accumulated over 963 gc collections is low value). Could you show the Job 
Manager log itself? Probably it’s the own that’s causing the TaskManager to 
timeout.

On the other hand, I see that Task Manager max heap size is ~5GB and I assume 
this is the same setting for the Job manager. A Stefan pointed out, there is 
some memory overhead on the Job Manager for retaining the checkpoint and it is 
around couple of hundred bytes (maybe even 1KB) per operator instance. By doing 
quick math:

2880 checkpoints * 10 task managers * 10 operators in the job * 8 parallelism 
per task manager * 500 bytes = ~1GB

The answer might be that you just need to increase the Job Manager max heap to 
retain 2880 checkpoints.

Piotrek

> On 10 Jan 2018, at 12:00, Jose Miguel Tejedor Fernandez 
>  wrote:
> 
> Hi,
> 
> I wonder what reason you might have that you ever want such a huge number of 
> retained checkpoints? 
> 
> The Flink jobs running on EMR cluster require a checkpoint at midnight. (In 
> our use case we need to synch a loaded delta to our a third party partner 
> with the streamed data). The delta load the whole day data and that's why we 
> wanted to have available the midnight's checkpoint to start from there.
> We could also make a savepoint at midnight, but it’s not as handy (we would 
> need to build our own tooling to do it), and it can’t benefit from the 
> smaller latency of an incremental checkpoint. Another thining is that 
> implementing our own savepoint tool is a bit hard to monitor. Besides, 
> retaining several having checkpoints created every minute is that it would 
> also allow us to load a delta at any time. Please, if there are better ways 
> of achieving this, let me know.
> 
> From where does the log trace come from?  
> 
> It comes from the TaskManager.  
> 
> Please search on the opposite side of the time outing connection for possible 
> root cause of the timeout including:
> - possible error/exceptions/warnings
> - long GC pauses or other blocking operations (possibly long unnatural gaps 
> in the logs)
> - machine health (CPU usage, disks usage, network connections)
> 
> It seems that TaskManager disconnect from JobManager and then cannot reach it 
> again and I cannot tell the reason. I think machine health metrics mentioned 
> above seems to be OK. Would you say Direct memory stats usage is correct? 
> What is the way to check the GC pauses?
> Those are some traces from the TaskManager log, before/after it detached from 
> JobManager
> 
> 2018-01-08 22:26:37,263 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - Garbage 
> collector stats: [PS Scavenge, GC TIME (ms): 31476, GC COUNT: 923], [PS 
> MarkSweep, GC TIME (ms): 10999, GC COUNT: 36]
> 2018-01-08 22:26:42,263 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - Memory usage 
> stats: [HEAP: 868/5597/5597 MB, NON HEAP: 116/119/-1 MB (used/committed/max)]
> 2018-01-08 22:26:42,263 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - Direct memory 
> stats: Count: 100, Total Capacity: 29942814, Used Memory: 29942815
> 2018-01-08 22:26:42,263 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - Off-heap pool 
> stats: [Code Cache: 42/42/240 MB (used/committed/max)], [Metaspace: 66/68/-1 
> MB (used/committed/max)], [Compressed Class Space: 8/8/1024 MB 
> (used/committed/max)]
> 2018-01-08 22:26:42,264 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - Garbage 
> collector stats: [PS Scavenge, GC TIME (ms): 31476, GC COUNT: 923], [PS 
> MarkSweep, GC TIME (ms): 10999, GC COUNT: 36]
> 2018-01-08 22:26:42,999 WARN  akka.remote.RemoteWatcher   
>   - Detected unreachable: 
> [akka.tcp://fl...@ip-10-1-51-209.cloud-internal.rovio.com:35341 
> ]
> 2018-01-08 22:26:43,034 INFO  org.apache.flink.yarn.YarnTaskManager   
>   - TaskManager akka://flink/user/taskmanager disconnects from 
> JobManager 
> akka.tcp://fl...@ip-10-1-51-209.cloud-internal.rovio.com:35341/user/jobmanager
>  
> : 
> JobManager is no longer reachable
> 2018-01-08 22:26:43,035 INFO  org.apache.flink.yarn.YarnTaskManager   
>   - Cancelling all computations and discarding all cached data.
> 2018-01-08 22:26:43,037 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Sink: Discarded events 
> (4/4) (50b6fc8908a4b13dbbe73f4686beda7d).
> 2018-01-08 22:26:43,037 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Sink: Discarded events (4/4) 
> (50b6fc8908a4b13dbbe73f4686beda7d) switched from RUNNING to FAILED.
> java.lang.Exception: Ta

Re: Datastream broadcast with KeyBy

2018-01-10 Thread Piotr Nowojski
Hi,

Could you elaborate what is the problem that you are having? What is the 
exception(s) that you are getting? I have tested such simple example and it’s 
seems to be working as expected:

DataStreamSource input = env.fromElements(1, 2, 3, 4, 5, 1, 2, 3);

DataStreamSource confStream = env.fromElements(42);

input.keyBy(new MyKeySelector()).connect(confStream.broadcast()).process(new 
MyCoProcessFunction()).print();

Thanks, Piotrek

> On 10 Jan 2018, at 10:01, anujk  wrote:
> 
> Currently we have an Flink pipeline running with Data-Src —> KeyBy —>
> ProcessFunction.  State Management (with RocksDB) and Timers are working
> well.
> Now we have to extend this by having another Config Stream which we want to
> broadcast to all process operators. So wanted to connect the Data Stream
> with Config Stream (with Config Stream being broadcast) and use
> CoProcessFunction to handle both streams.
> 
> KeyBy uses Hash based partitioning and also if we write CustomPartitioner it
> can return only one partition (Array of SelectedChannel option as in
> BroadcastPartitioner is not allowed).
> Would have liked this to work —
> dataStream.keyBy().connect(confStream.broadcast()).process(…RichCoProcessFunction()…)
> but it says both stream must be keyed.
> 
> Is there any way to make this work?
> 
> dataStream.connect(confStream.broadcast()).flatMap(...
> RichCoFlatMapFunction() …) ==> broadcast works. But we really want KeyBy and
> processFunction functionality.
> 
> Thanks,
> Anuj
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Stream job failed after increasing number retained checkpoints

2018-01-10 Thread Jose Miguel Tejedor Fernandez
Hi,

I wonder what reason you might have that you ever want such a huge number
> of retained checkpoints?


The Flink jobs running on EMR cluster require a checkpoint at midnight. (In
our use case we need to synch a loaded delta to our a third party
partner with the streamed data). The delta load the whole day data and
that's why we wanted to have available the midnight's checkpoint to start
from there.
We could also make a savepoint at midnight, but it’s not as handy (we would
need to build our own tooling to do it), and it can’t benefit from the
smaller latency of an incremental checkpoint. Another thining is that
implementing our own savepoint tool is a bit hard to monitor. Besides,
retaining several having checkpoints created every minute is that it would
also allow us to load a delta at any time. Please, if there are better ways
of achieving this, let me know.

>From where does the log trace come from?


It comes from the TaskManager.

Please search on the opposite side of the time outing connection for
> possible root cause of the timeout including:
> - possible error/exceptions/warnings
> - long GC pauses or other blocking operations (possibly long unnatural
> gaps in the logs)
> - machine health (CPU usage, disks usage, network connections)


It seems that TaskManager disconnect from JobManager and then cannot reach
it again and I cannot tell the reason. I think machine health metrics
mentioned above seems to be OK. Would you say *Direct memory stats *usage
is correct? What is the way to check the GC pauses?
Those are some traces from the TaskManager log, before/after it detached
from JobManager

2018-01-08 22:26:37,263 INFO
org.apache.flink.runtime.taskmanager.TaskManager  - Garbage
collector stats: [PS Scavenge, GC TIME (ms): 31476, GC COUNT: 923], [PS
MarkSweep, GC TIME (ms): 10999, GC COUNT: 36]
2018-01-08 22:26:42,263 INFO
org.apache.flink.runtime.taskmanager.TaskManager  - Memory
usage stats: [HEAP: 868/5597/5597 MB, NON HEAP: 116/119/-1 MB
(used/committed/max)]
2018-01-08 22:26:42,263 INFO
org.apache.flink.runtime.taskmanager.TaskManager  - Direct
memory stats: Count: 100, Total Capacity: 29942814, Used Memory: 29942815
2018-01-08 22:26:42,263 INFO
org.apache.flink.runtime.taskmanager.TaskManager  - Off-heap
pool stats: [Code Cache: 42/42/240 MB (used/committed/max)], [Metaspace:
66/68/-1 MB (used/committed/max)], [Compressed Class Space: 8/8/1024 MB
(used/committed/max)]
2018-01-08 22:26:42,264 INFO
org.apache.flink.runtime.taskmanager.TaskManager  - Garbage
collector stats: [PS Scavenge, GC TIME (ms): 31476, GC COUNT: 923], [PS
MarkSweep, GC TIME (ms): 10999, GC COUNT: 36]
2018-01-08 22:26:42,999 WARN  akka.remote.RemoteWatcher
 - Detected unreachable: [akka.tcp://
fl...@ip-10-1-51-209.cloud-internal.rovio.com:35341]
2018-01-08 22:26:43,034 INFO  org.apache.flink.yarn.YarnTaskManager
 - TaskManager akka://flink/user/taskmanager disconnects
from JobManager akka.tcp://
fl...@ip-10-1-51-209.cloud-internal.rovio.com:35341/user/jobmanager:
JobManager is no longer reachable
2018-01-08 22:26:43,035 INFO  org.apache.flink.yarn.YarnTaskManager
 - Cancelling all computations and discarding all cached
data.
2018-01-08 22:26:43,037 INFO  org.apache.flink.runtime.taskmanager.Task
 - Attempting to fail task externally Sink: Discarded
events (4/4) (50b6fc8908a4b13dbbe73f4686beda7d).
2018-01-08 22:26:43,037 INFO  org.apache.flink.runtime.taskmanager.Task
 - Sink: Discarded events (4/4)
(50b6fc8908a4b13dbbe73f4686beda7d) switched from RUNNING to FAILED.
java.lang.Exception: TaskManager akka://flink/user/taskmanager disconnects
from JobManager akka.tcp://
fl...@ip-10-1-51-209.cloud-internal.rovio.com:35341/user/jobmanager:
JobManager is no longer reachable
at
org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1095)
at
org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:311)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at
org.apache.flink.runti

Re: Queryable State - Count within Time Window

2018-01-10 Thread Velumani Duraisamy
Thank you, Fabian, for the references. This is helpful. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: akka.remote.ShutDownAssociation: Shut down address: akka.tcp://flink@fps-flink-jobmanager:45652

2018-01-10 Thread Till Rohrmann
Hi,

I'm actually not sure what's going on there. I suspect that it must have
something to do with the local setup. Not sure from where you submit your
job. If you submitted the job from fps-flink-jobmanager then this could the
client actor system which shuts down after submitting the job because you
submitted the job in detached mode.

It would be helpful if you could describe a little bit more how you set up
the Flink cluster and how you submit jobs. Moreover, it would be great to
get access to all relevant logs (JM, TMs and client).

Cheers,
Till

On Tue, Jan 9, 2018 at 2:24 PM, Fabian Hueske  wrote:

> Hi,
>
> Till (in CC) might be able to help with Akka related questions.
>
> Best, Fabian
>
> 2018-01-08 6:46 GMT+01:00 Hao Sun :
>
>> I am running Flink 1.3.2 in my local docker environment.
>>
>> I see this error, not sure how to find the root cause.
>> I am confused by this error message, why JM is trying to connect to JM
>> from one random port to the RPC port: 6123?
>>
>> 
>> 2018-01-08 05:38:03,294 ERROR akka.remote.EndpointWriter -
>> AssociationError [akka.tcp://flink@fps-flink-jobmanager:6123] <-
>> [akka.tcp://flink@fps-flink-jobmanager:45652]: Error [Shut down address:
>> akka.tcp://flink@fps-flink-jobmanager:45652] [
>> akka.remote.ShutDownAssociation: Shut down address:
>> akka.tcp://flink@fps-flink-jobmanager:45652
>> Caused by: akka.remote.transport.Transport$InvalidAssociationException:
>> The remote system terminated the association because it is shutting down.
>> ]
>> 
>>
>> Here you can find the full log
>> https://gist.github.com/zenhao/c8d13cce8601e321dd706a2ac53f5032
>>
>>
>


Re: hadoop-free hdfs config

2018-01-10 Thread Till Rohrmann
Hi Sasha,

you're right that if you want to access HDFS from the user code only it
should be possible to use the Hadoop free Flink version and bundle the
Hadoop dependencies with your user code. However, if you want to use
Flink's file system state backend as you did, then you have to start the
Flink cluster with the Hadoop dependency in its classpath. The reason is
that the FsStateBackend is part of the Flink distribution and will be
loaded using the system class loader.

One thing you could try out is to use the RocksDB state backend instead.
Since the RocksDBStateBackend is loaded dynamically, I think it should use
the Hadoop dependencies when trying to load the filesystem.

Cheers,
Till

On Tue, Jan 9, 2018 at 10:46 PM, Oleksandr Baliev  wrote:

> Hello guys,
>
> want to clarify for myself: since flink 1.4.0 allows to use hadoop-free
> distribution and dynamic hadoop dependencies loading, I suppose that if to
> download hadoop-free distribution, start cluster without any hadoop and
> then load any job's jar which has some hadoop dependencies (i
> used 2.6.0-cdh5.10.1), hadoop should be visible in classpath and when start
> job which accesses hdfs via source/sink/etc. or making checkpoints can be
> run on such hadoop-free cluster.
>
> But when I start a job during config initialization for checkpoint I have
> "Hadoop is not in the classpath/dependencies.":
>
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
> find a file system implementation for scheme 'hdfs'. The scheme is not
> directly supported by Flink and no Hadoop file system to support this
> scheme could be loaded.
> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(F
> ileSystem.java:405)
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
> at org.apache.flink.runtime.state.filesystem.FsCheckpointStream
> Factory.(FsCheckpointStreamFactory.java:99)
> at org.apache.flink.runtime.state.filesystem.FsStateBackend.cre
> ateStreamFactory(FsStateBackend.java:277)
> ...
>
>
>  What I've found seems in org.apache.flink.core.fs.Fi
> leSystem#getUnguardedFileSystem in FS_FACTORIES there is no "hdfs" schema
> registered and FALLBACK_FACTORY which should be loaded with hadoop factory
> has org.apache.flink.core.fs.UnsupportedSchemeFactory but it loads when
> taskmanager is starting (when there should be no hadoop dependencies), so
> that should be ok.
>
> so as I understand hadoop file system is not recongnised by flink if it
> was not loaded at the beginning, is it correct or maybe I just messed up
> with something / somewhere?
>
> Thanks,
> Sasha
>


Re: is it possible to convert "retract" datastream to table

2018-01-10 Thread Timo Walther

Hi Yan,

there are no table source interfaces that allow for creating a retract 
stream directly yet. Such an interface has to be carefully designed 
because built-in operators assume that only records that have been 
emitted previously are retracted. However, they are planned for future 
Flink versions.


As a workaround you could implement a custom rule that translates parts 
of your plan into a custom DataStream operator. This might require some 
investigation how the translation is done internally because this is not 
documented. I don't know if it would be worth the effort. You might take 
a look at TableEnvironment.getConfig().setCalciteConfig() where you can 
add additional rules. You can use the available rules in 
org.apache.flink.table.plan.rules.FlinkRuleSets as a reference.


I hope this helps.

Regards,
Timo


Am 1/10/18 um 1:04 AM schrieb Yan Zhou [FDS Science]:


Hi,


There are APIs to convert a dynamic table to retract stream. Is there 
any way to construct a "retract" data stream and convert it into 
table? I want to read the change log of relational database from 
kafka, "apply" the changes within flink( by creating CRow 
DataStream), register/create a table on the CRow DataStream. Is there 
any way to do this?



Best

Yan





Re: is it possible to convert "retract" datastream to table

2018-01-10 Thread Fabian Hueske
Hi,

Unfortunately, converting a retraction stream into a Table is not supported
yet.
However this is definitely on our road map and will be added in a future
version.

Best, Fabian

2018-01-10 1:04 GMT+01:00 Yan Zhou [FDS Science] :

> Hi,
>
>
> There are APIs to convert a dynamic table to retract stream. Is there any
> way to construct a "retract" data stream and convert it into table? I
> want to read the change log of relational database from kafka, "apply" the
> changes within flink( by creating CRow DataStream), register/create a table
> on the CRow DataStream. Is there any way to do this?
>
>
> Best
>
> Yan
>


Datastream broadcast with KeyBy

2018-01-10 Thread anujk
Currently we have an Flink pipeline running with Data-Src —> KeyBy —>
ProcessFunction.  State Management (with RocksDB) and Timers are working
well.
Now we have to extend this by having another Config Stream which we want to
broadcast to all process operators. So wanted to connect the Data Stream
with Config Stream (with Config Stream being broadcast) and use
CoProcessFunction to handle both streams.

KeyBy uses Hash based partitioning and also if we write CustomPartitioner it
can return only one partition (Array of SelectedChannel option as in
BroadcastPartitioner is not allowed).
Would have liked this to work —
dataStream.keyBy().connect(confStream.broadcast()).process(…RichCoProcessFunction()…)
but it says both stream must be keyed.

Is there any way to make this work?

dataStream.connect(confStream.broadcast()).flatMap(...
RichCoFlatMapFunction() …) ==> broadcast works. But we really want KeyBy and
processFunction functionality.

Thanks,
Anuj



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink 1.4.0 Mapr libs issues

2018-01-10 Thread Fabian Hueske
Great, thanks for reporting back!

2018-01-09 18:05 GMT+01:00 ani.desh1512 :

> Hi Fabian,
> Thanks a lot for the reply. Setting the classloader.resolve-order
> configuration seems to have done the trick. For anybody else, having the
> same problem as this, this is the config that I set:
>
> /*classloader.resolve-order: parent-first
> classloader.parent-first-patterns:
> java.;org.apache.flink.;javax.annotation;org.slf4j;org.
> apache.log4j;org.apache.logging.log4j;ch.qos.logback;
> com.mapr.;org.apache.*/
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Custom Partitioning for Keyed Streams

2018-01-10 Thread Piotr Nowojski
Hi,

I don’t think it is possible to enforce scheduling of two keys to different 
nodes, since all of that is based on hashes.

For some cases, doing the pre-aggregation step (initial aggregation done before 
keyBy, which is followed by final aggregation after the keyBy) can be the 
solution for handling a data skew. With pre aggregation, some (most?) of the 
work can be distributed and be done on the source node instead of doing all of 
the heavy lifting on the destination node. It has not been yet merged to the 
Flink code, but it’s entirely a user space code, which you could copy paste 
(and adjust) into your project. Pull request containing pre aggregation is here:
https://github.com/apache/flink/pull/4626 

Please pay attention at the limitations of this code (documented in the java 
doc).

If above code doesn’t work for you for whatever reason, you can also try to 
implement some custom tailored pre aggregation. Like having two keyBy steps, 
where in first you can artificially split A and B keys into couple of smaller 
ones and the second keyBy could merge/squash the results.

Piotrek

> On 9 Jan 2018, at 21:55, Martin, Nick  wrote:
> 
> Have a set of stateful operators that rely on keyed state. There is 
> substantial skew between keys (i.e. there will be 100 messages on keys A and 
> B, and 10 messages each on keys C-J), and key selection assignment is 
> dictated by the needs of my application such that I can’t choose keys in a 
> way that will eliminate the skew. The skew is somewhat predictable (i.e. I 
> know keys A and B will usually get roughly 10x as many messages as the rest) 
> and fairly consistent on different timescales (i.e. counting the messages on 
> each key for 30 seconds would provide a reasonably good guess as to the 
> distribution of messages that will be received over the next 10-20 minutes).
>  
> The problem I’m having is that often the high volume keys (A and B in the 
> example) end up on the same task slot and slow it down, while the low volume 
> ones are distributed across the other operators, leaving them underloaded. I 
> looked into the available physical partitioning functions, but it looks like 
> that functionality is generally incompatible with keyed streams, and I need 
> access to keyed state to do my actual processing. Is there any way I can get 
> better load balancing while using keyed state?
> 
> Notice: This e-mail is intended solely for use of the individual or entity to 
> which it is addressed and may contain information that is proprietary, 
> privileged and/or exempt from disclosure under applicable law. If the reader 
> is not the intended recipient or agent responsible for delivering the message 
> to the intended recipient, you are hereby notified that any dissemination, 
> distribution or copying of this communication is strictly prohibited. This 
> communication may also contain data subject to U.S. export laws. If so, data 
> subject to the International Traffic in Arms Regulation cannot be 
> disseminated, distributed, transferred, or copied, whether incorporated or in 
> its original form, to foreign nationals residing in the U.S. or abroad, 
> absent the express prior approval of the U.S. Department of State. Data 
> subject to the Export Administration Act may not be disseminated, 
> distributed, transferred or copied contrary to U. S. Department of Commerce 
> regulations. If you have received this communication in error, please notify 
> the sender by reply e-mail and destroy the e-mail message and any physical 
> copies made of the communication.
>  Thank you. 
> *



Re: Stream job failed after increasing number retained checkpoints

2018-01-10 Thread Stefan Richter
Hi,

there is no known limitation in the strict sense, but you might run out of dfs 
space or job manager memory if you keep around a huge number checkpoints. I 
wonder what reason you might have that you ever want such a huge number of 
retained checkpoints? Usually keeping one checkpoint should do the job, maybe a 
couple more if you are very afraid about corruption that goes beyond your DFSs 
capabilities to handle it. Is there any reason for that or maybe a 
misconception about increasing the number of retained checkpoints is good for?

Best,
Stefan 

> Am 10.01.2018 um 08:54 schrieb Piotr Nowojski :
> 
> Hi,
> 
> Increasing akka’s timeouts is rarely a solution for any problems - it either 
> do not help, or just mask the issue making it less visible. But yes, it is 
> possible to bump the limits: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#distributed-coordination-via-akka
>  
> 
> 
> I don’t think that state.checkpoints.num-retained was thought to handle such 
> large numbers of retained checkpoint so maybe there are some known/unknown 
> limitations. Stefan, do you know something in this regard?
> 
> Parallel thing to do is that like for any other akka timeout, you should 
> track down the root cause of it. This one warning line doesn’t tell much. 
> From where does it come from? Client log? Job manager log? Task manager log? 
> Please search on the opposite side of the time outing connection for possible 
> root cause of the timeout including:
> - possible error/exceptions/warnings
> - long GC pauses or other blocking operations (possibly long unnatural gaps 
> in the logs)
> - machine health (CPU usage, disks usage, network connections)
> 
> Piotrek
> 
>> On 9 Jan 2018, at 16:38, Jose Miguel Tejedor Fernandez 
>> mailto:jose.fernan...@rovio.com>> wrote:
>> 
>> Hello,
>> 
>> I have several stream jobs running (v. 1.3.1 ) in production which always 
>> fails after a fixed period of around 30h after being executing. That's the 
>> WARN trace before failing:
>> 
>> Association with remote system 
>> [akka.tcp://fl...@ip-10-1-51-134.cloud-internal.acme.com:39876 
>> ] has failed, 
>> address is now gated for [5000] ms. Reason: [Association failed with 
>> [akka.tcp://fl...@ip-10-1-51-134.cloud-internal.acme.com:39876 
>> ]] Caused by: 
>> [No response from remote for outbound association. Handshake timed out after 
>> [2 ms].
>> 
>> The main change done in the job configuration was to increase the 
>> state.checkpoints.num-retained from 1 to 2880. I am using asynchronous 
>> RocksDB to persists to snapshot the state. (I attach some screenshots with 
>> the  checkpoint conf from webUI)
>> 
>> May my assumption be correct that the increase of checkpoints.num-retained 
>> is causing the problem? Any known issue regarding this?
>> Besides, Is there any way to increase the Akka handshake timeout from the 
>> current 2 ms to a higher value? I considered that it may be convenient 
>> to increase the timeout to 1 minute instead.
>> 
>> BR
>> 
>> 
>> > 17.35.18.png>
> 



Re: What's the meaning of "Registered `TaskManager` at akka://flink/deadLetters " ?

2018-01-10 Thread Piotr Nowojski
Hi,

Search both job manager and task manager logs for ip address(es) and port(s) 
that have timeouted. First of all make sure that nodes are visible to each 
other using some simple ping. Afterwards please check that those timeouted 
ports are opened and not blocked by some firewall (telnet).

You can search the documentation for the configuration parameters with “port” 
in name:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html 

But note that many of them are random by default.

Piotrek

> On 9 Jan 2018, at 17:56, Reza Samee  wrote:
> 
> 
> I'm running a flink-cluster (a mini one with just one node); but the problem 
> is that my TaskManager can't reach to my JobManager!
> 
> Here are logs from TaskManager
> ...
> Trying to register at JobManager akka.tcp://flink@MY_PRIV_IP/user/jobmanager 
> (attempt 20, timeout: 30 seconds)
> Trying to register at JobManager akka.tcp://flink@MY_PRIV_IP/user/jobmanager 
> (attempt 21, timeout: 30 seconds)
> Trying to register at JobManager akka.tcp://flink@MY_PRIV_IP/user/jobmanager 
> (attempt 22, timeout: 30 seconds)
> Trying to register at JobManager akka.tcp://flink@MY_PRIV_IP/user/jobmanager 
> (attempt 23, timeout: 30 seconds)
> Trying to register at JobManager akka.tcp://flink@MY_PRIV_IP/user/jobmanager 
> (attempt 24, timeout: 30 seconds)
> ...
> 
> My "JobManager UI" shows my TaskManager with this Path & ID: 
> "akka://flink/deadLetters" ( in TaskManagers tab)
> And I found these lines in my JobManger stdout:
> 
> Resource Manager associating with leading JobManager 
> Actor[akka://flink/user/jobmanager#-275619168] - leader session null
> TaskManager ResourceID{resourceId='1132cbdaf2d8204e5e42e321e8592754'} has 
> started.
> Registered TaskManager at MY_PRIV_IP (akka://flink/deadLetters) as 
> 7d9568445b4557a74d05a0771a08ad9c. Current number of registered hosts is 1. 
> Current number of alive task slots is 20.
> 
> 
> What's the meaning of these lines? Where should I look for the solution?
> 
> 
> 
> 
> -- 
> رضا سامعی / http://samee.blog.ir