Re: Apache Flink - Are counters reliable and accurate ?

2019-06-27 Thread Chesnay Schepler
So here's the thing: Metrics are accurate, so long as the job is 
running. Once the job terminates metrics are cleaned up and not 
persisted anywhere, with the exception of a few metrics (like numRecordsIn).


Another thing that is always good to double-check is to enable DEBUG 
logging and re-run your test.


On 27/06/2019 22:41, M Singh wrote:

Hi Chesnay:

Thanks for your response.

My job runs for a few minutes and i've tried setting the reporter 
interval to 1 second.


I will try the counter on a longer running job.

Thanks again.

On Thursday, June 27, 2019, 11:46:17 AM EDT, Chesnay Schepler 
 wrote:



1) None that I'm aware of.
2) You should use counters.
3) No, counters are not checkpointed, but you could store the value in 
state yourself.
4) None that I'm aware of that doesn't require modifications to the 
application logic.


How long does your job run for, and how do you access metrics?

On 27/06/2019 17:36, M Singh wrote:

Hi:

I need to collect application metrics which are counts (per unit of 
time eg: minute) for certain events.  There are two ways of doing this:


1. Create separate streams (using split stream etc) in the 
application explicitly, then aggregate the counts in a window and 
save them.  This mixes metrics collection with application logic and 
making the application logic complex.

2. Use Flink metrics framework (counter, guage, etc) to save metrics

I have a very small test with 2 events but when I run the application 
the counters are not getting saved (they show value 0) even though 
that part of the code is being executed.  I do see the numRecordsIn 
counters being updated from the source operator.  I've also tried 
incrementing the count by 10 (instead of 1) every time the function 
gets execute but still the counts remain 0.


Here is snippet of the code:

dataStream.map(new RichMapFunction() {

protected Counter counter;

public void open(Configuration parameters) {
counter = 
getRuntimeContext().getMetricGroup().addGroup("test", 
"split").counter("success");

}
@Override
public String map(String value) throws Exception {
counter.inc();
return value;
}
});


As I mentioned, I do get the success metric count but the value is 
always 0, even though the above map function was executed.


My questions are:

1. Are there any issues regarding counters being approximate ?
2. If I want to collect accurate counts, is it recommended to use 
counters or should I do it explicitly (which is making the code too 
complex) ?

3. Do counters participate in flink's failure/checkpointing/recovery ?
4. Is there any better way of collecting application metric counts ?

Thanks

Mans







[External] Regarding kinesis data analytics for flink

2019-06-27 Thread Vishal Sharma
Hi Leo,

Recently, I came across kinesis data analytics which provides managed flink
in AWS. It seems promising.

Some concerns that I have with my very little exploration till now are -
=> it uses flink 1.6 (not the latest one)
=> Doesn't support kafka streams directly
=> No option to configure task slots

Just wanted to know, If you can share your experience with this platform.

Thanks,
Vishal Sharma

-- 
*_Grab is hiring. Learn more at _**https://grab.careers 
*


By communicating with Grab Inc and/or its 
subsidiaries, associate companies and jointly controlled entities (“Grab 
Group”), you are deemed to have consented to the processing of your 
personal data as set out in the Privacy Notice which can be viewed at 
https://grab.com/privacy/ 


This email contains 
confidential information and is only for the intended recipient(s). If you 
are not the intended recipient(s), please do not disseminate, distribute or 
copy this email Please notify Grab Group immediately if you have received 
this by mistake and delete this email from your system. Email transmission 
cannot be guaranteed to be secure or error-free as any information therein 
could be intercepted, corrupted, lost, destroyed, delayed or incomplete, or 
contain viruses. Grab Group do not accept liability for any errors or 
omissions in the contents of this email arises as a result of email 
transmission. All intellectual property rights in this email and 
attachments therein shall remain vested in Grab Group, unless otherwise 
provided by law.



Re:Flink batch job memory/disk leak when invoking set method on a static Configuration object.

2019-06-27 Thread Haibo Sun
Hi, Vadim


This similar issue has occurred in earlier versions, see 
https://issues.apache.org/jira/browse/FLINK-4485.
Are you running a Flink session cluster on YARN? I think it might be a bug. 
I'll see if I can reproduce it with the master branch code, and if yes, I will 
try to investigate it.


If someone already knows the cause of the problem, that's the best,  it won't 
need to be re-investigated.


Best,
Haibo



At 2019-06-28 00:46:43, "Vadim Vararu"  wrote:

Hi guys,


I have a simple batch job with a custom output formatter that writes to a local 
file.


public class JobHadoop {

public static void main(String[] args) throws Exception {
ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

env.fromCollection(Sets.newHashSet("line1", "line2", "line3"))
.map(line -> line + "dd")
.write(new HadoopUsageOutputFormat(), "file:///tmp/out");

env.execute(JobHadoop.class.getName());
}

}
public class HadoopUsageOutputFormat extends FileOutputFormat 
implements OutputFormat {

private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new 
Configuration();
public static final String DEFAULT_LINE_DELIMITER = "\n";

private Writer writer;

static {
DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1");
}

@Override
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);
writer = new OutputStreamWriter(new BufferedOutputStream(stream));
}

@Override
public void writeRecord(String record) throws IOException {
writer.write(record);
writer.write(DEFAULT_LINE_DELIMITER);
}

@Override
public void close() throws IOException {
if (writer != null) {
this.writer.flush();
this.writer.close();
}

super.close();
}
}


The problem is that after the job is finished, there is somewhere a memory leak 
that does not permit the blobStore of the job to be deleted. The number of such 
"deleted" files increases after each job run. Even if they are marked as 
deleted, there is somewhere a reference to the file in the JobManager process 
that keeps it from actual deletion.








Also, the problem reproduces only if I actually invoke the set method of 
Configuration:
static {
DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1");
}


From my observations, if I change the 
private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new 
Configuration();
to a non-static field, then the problem does no reproduce any more.




However, I'm interested if that's a normal behaviour or a bug/leak somewhere in 
Flink itself.


Thanks, 
Vadim.



About "Flink 1.7.0 HA based on zookeepers "

2019-06-27 Thread 胡逸才
HI Tan:
I have the same problem with you when running "flink-1.7.2 ON KUBERNATE HA" 
mode, may I ask if you have solved this problem? How? After I started the two 
jobmanagers normally, when I tried to kill one of them, he could not restart 
normally. Both jobmanagers reported this error. The specific log is as follows:








2019-06-28 09:57:57.253 [flink-akka.actor.default-dispatcher-4] WARN  
akka.remote.transport.netty.NettyTransport New I/O boss #3 - Remote connection 
to [null] failed with java.net.ConnectException: Connection refused: 
tdh2/192.168.208.55:56529
2019-06-28 09:57:57.253 [flink-akka.actor.default-dispatcher-4] WARN  
akka.remote.ReliableDeliverySupervisor 
flink-akka.remote.default-remote-dispatcher-14 - Association with remote system 
[akka.tcp://flink@tdh2:56529] has failed, address is now gated for [50] ms. 
Reason: [Association failed with [akka.tcp://flink@tdh2:56529]] Caused by: 
[Connection refused: tdh2/192.168.208.55:56529]
2019-06-28 09:57:57.253 [flink-akka.actor.default-dispatcher-4] WARN  
akka.remote.ReliableDeliverySupervisor 
flink-akka.remote.default-remote-dispatcher-14 - Association with remote system 
[akka.tcp://flink@tdh2:56529] has failed, address is now gated for [50] ms. 
Reason: [Association failed with [akka.tcp://flink@tdh2:56529]] Caused by: 
[Connection refused: tdh2/192.168.208.55:56529]
2019-06-28 09:57:57.260 [flink-rest-server-netty-worker-thread-7] ERROR 
o.a.f.r.rest.handler.legacy.files.StaticFileServerHandler  - Could not retrieve 
the redirect address.
java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask 
timed out on [Actor[akka.tcp://flink@tdh2:56529/user/dispatcher#299521377]] 
after [1 ms]. Sender[null] sent message of type 
"org.apache.flink.runtime.rpc.messages.RemoteFencedMessage".
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:748)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka.tcp://flink@tdh2:56529/user/dispatcher#299521377]] after [1 
ms]. Sender[null] sent message of type 
"org.apache.flink.runtime.rpc.messages.RemoteFencedMessage".
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
... 9 common frames omitted

Re: HDFS checkpoints for rocksDB state backend:

2019-06-27 Thread Yang Wang
Hi, Andrea

If you are running flink cluster on Yarn, the jar
`flink-shaded-hadoop2-uber-1.6.4.jar` should exist in the lib dir of  the
flink client, so that it could be uploaded to the Yarn Distributed Cache
and then be available on JM and TM.
And if you are running flink standalone cluster, the jar
`flink-shaded-hadoop2-uber-1.6.4.jar` should exist on each slaves which you
want to start a TaskManager.

You could check the classpath in the TaskManager log.

Andrea Spina  于2019年6月27日周四 下午3:52写道:

> HI Qiu,
> my jar does not contain the class
> `org.apache.hadoop.hdfs.protocol.HdfsConstants*`, *but I do expect it is
> contained within `flink-shaded-hadoop2-uber-1.6.4.jar` which is located in
> Flink cluster libs.
>
> Il giorno gio 27 giu 2019 alle ore 04:03 Congxian Qiu <
> qcx978132...@gmail.com> ha scritto:
>
>> Hi  Andrea
>>
>> As the NoClassDefFoundError, could you please verify that there exist
>> `org.apache.hadoop.hdfs.protocol.HdfsConstants*` *in your jar.
>> Or could you use Arthas[1] to check if there exists the class when
>> running the job?
>>
>> [1] https://github.com/alibaba/arthas
>> Best,
>> Congxian
>>
>>
>> Andrea Spina  于2019年6月27日周四 上午1:57写道:
>>
>>> Dear community,
>>> I'm trying to use HDFS checkpoints in flink-1.6.4 with the following
>>> configuration
>>>
>>> state.backend: rocksdb
>>> state.checkpoints.dir: hdfs://
>>> rbl1.stage.certilogo.radicalbit.io:8020/flink/checkpoint
>>> state.savepoints.dir: hdfs://
>>> rbl1.stage.certilogo.radicalbit.io:8020/flink/savepoints
>>>
>>> and I record the following exceptions
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *Caused by: java.io.IOException: Could not flush and close the file
>>> system output stream to
>>> hdfs://my.rb.biz:8020/flink/checkpoint/fd35c7145e6911e1721cd0f03656b0a8/chk-2/48502e63-cb69-4944-8561-308da2f9f26a
>>> 
>>> in order to obtain the stream state handleat
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:328)
>>>   at
>>> org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:77)
>>>   at
>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:826)
>>>   at
>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:759)
>>>   at
>>> org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>>>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)at
>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
>>> ... 7 moreCaused by: java.io.IOException: DataStreamer Exception:
>>>   at
>>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:562)Caused
>>> by: java.lang.NoClassDefFoundError: Could not initialize class
>>> org.apache.hadoop.hdfs.protocol.HdfsConstantsat
>>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1318)
>>>   at
>>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1262)
>>>   at
>>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:448)*
>>>
>>> or
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *  at java.util.concurrent.FutureTask.run(FutureTask.java:266)at
>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
>>> ... 7 moreCaused by: java.io.IOException: DataStreamer Exception:
>>>   at
>>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:562)Caused
>>> by: javax.xml.parsers.FactoryConfigurationError: Provider for class
>>> javax.xml.parsers.DocumentBuilderFactory cannot be createdat
>>> javax.xml.parsers.FactoryFinder.findServiceProvider(FactoryFinder.java:311)
>>>   at javax.xml.parsers.FactoryFinder.find(FactoryFinder.java:267)
>>>   at
>>> javax.xml.parsers.DocumentBuilderFactory.newInstance(DocumentBuilderFactory.java:120)
>>>   at
>>> org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2515)
>>>   at
>>> org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2492)
>>>   at
>>> org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2405)
>>>   at org.apache.hadoop.conf.Configuration.get(Configuration.java:981)
>>>   at
>>> org.apache.hadoop.conf.Configuration.getTrimmed(Configuration.java:1031)
>>> at
>>> org.apache.hadoop.conf.Configuration.getInt(Configuration.java:1251)
>>> at
>>> org.apache.hadoop.hdfs.protocol.HdfsConstants.(HdfsConstants.java:76)
>>>   at
>>> 

Re: Apache Flink - Running application with different flink configurations (flink-conf.yml) on EMR

2019-06-27 Thread M Singh
 Hi Xintong:  Thanks for your pointers.
I tried -Dparallelism.default=2 locally (in IDE) and it did not work.  Do you 
know if there is a common way that would work both for emr, locally and ide ?
Thanks again.
On Thursday, June 27, 2019, 12:03:06 AM EDT, Xintong Song 
 wrote:  
 
 Hi Singh,
You can use the environment variable "FLINK_CONF_DIR" to specify path to the 
directory of config files. You can also override config options with command 
line arguments prefixed -D (for yarn-session.sh) or -yD (for 'flink run' 
command).

Thank you~

Xintong Song




On Wed, Jun 26, 2019 at 9:13 PM M Singh  wrote:

Hi:
I have a single EMR cluster with Flink and want to run multiple applications on 
it with different flink configurations.  Is there a way to 
1. Pass the config file name for each application, or2. Overwrite the config 
parameters via command line arguments for the application.  This is similar to 
how we can overwrite the default parameters in spark
I searched the documents and have tried using ParameterTool with the config 
parameter names, but it has not worked as yet.
Thanks for your help.
Mans
  

Re: Apache Flink - Are counters reliable and accurate ?

2019-06-27 Thread M Singh
 Hi Chesnay:
Thanks for your response.
My job runs for a few minutes and i've tried setting the reporter interval to 1 
second.
I will try the counter on a longer running job.
Thanks again.
On Thursday, June 27, 2019, 11:46:17 AM EDT, Chesnay Schepler 
 wrote:  
 
  1) None that I'm aware of.
 2) You should use counters.
 3) No, counters are not checkpointed, but you could store the value in state 
yourself.
 4) None that I'm aware of that doesn't require modifications to the 
application logic.
 
 How long does your job run for, and how do you access metrics?
 
 On 27/06/2019 17:36, M Singh wrote:
  
  Hi: 
  I need to collect application metrics which are counts (per unit of time eg: 
minute)  for certain events.  There are two ways of doing this: 
  1. Create separate streams (using split stream etc) in the application 
explicitly, then aggregate the counts in a window and save them.  This mixes 
metrics collection with application logic and making the application logic 
complex. 2. Use Flink metrics framework (counter, guage, etc) to save metrics 
  I have a very small test with 2 events but when I run the application the 
counters are not getting saved (they show value 0) even though that part of the 
code is being executed.  I do see the numRecordsIn counters being updated from 
the source operator.  I've also tried incrementing the count by 10 (instead of 
1) every time the function gets execute but still the counts remain 0. 
  Here is snippet of the code: 
dataStream.map(new RichMapFunction() { 
              protected Counter counter; 
              public void open(Configuration parameters) {                 
counter = getRuntimeContext().getMetricGroup().addGroup("test", 
"split").counter("success");             }             @Override             
public String map(String value) throws Exception {                 
counter.inc();                 return value;             }         });  
  
  As I mentioned, I do get the success metric count but the value is always 0, 
even though the above map function was executed.   
  My questions are: 
  1. Are there any issues regarding counters being approximate ? 2. If I want 
to collect accurate counts, is it recommended to use counters or should I do it 
explicitly (which is making the code too complex) ? 3. Do counters participate 
in flink's failure/checkpointing/recovery ? 4. Is there any better way of 
collecting application metric counts ? 
  Thanks 
  Mans  
 

 
   

Flink batch job memory/disk leak when invoking set method on a static Configuration object.

2019-06-27 Thread Vadim Vararu
Hi guys,

I have a simple batch job with a custom output formatter that writes to a local 
file.


public class JobHadoop {

public static void main(String[] args) throws Exception {
ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

env.fromCollection(Sets.newHashSet("line1", "line2", "line3"))
.map(line -> line + "dd")
.write(new HadoopUsageOutputFormat(), "file:///tmp/out");

env.execute(JobHadoop.class.getName());
}

}

public class HadoopUsageOutputFormat extends FileOutputFormat 
implements OutputFormat {

private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new 
Configuration();
public static final String DEFAULT_LINE_DELIMITER = "\n";

private Writer writer;

static {
DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1");
}

@Override
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);
writer = new OutputStreamWriter(new BufferedOutputStream(stream));
}

@Override
public void writeRecord(String record) throws IOException {
writer.write(record);
writer.write(DEFAULT_LINE_DELIMITER);
}

@Override
public void close() throws IOException {
if (writer != null) {
this.writer.flush();
this.writer.close();
}

super.close();
}
}

The problem is that after the job is finished, there is somewhere a memory leak 
that does not permit the blobStore of the job to be deleted. The number of such 
"deleted" files increases after each job run. Even if they are marked as 
deleted, there is somewhere a reference to the file in the JobManager process 
that keeps it from actual deletion.

[cid:55491778-9e15-4f39-bb1a-637d855e68fb]


Also, the problem reproduces only if I actually invoke the set method of 
Configuration:

static {
DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1");
}

>From my observations, if I change the

private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new 
Configuration();

to a non-static field, then the problem does no reproduce any more.


However, I'm interested if that's a normal behaviour or a bug/leak somewhere in 
Flink itself.

Thanks,
Vadim.



Debug Kryo.Serialization Exception

2019-06-27 Thread Fabian Wollert
Hi, we have some Flink Jobs (Flink Version 1.7.1) consuming from a Custom
Source and Ingesting into an Elasticsearch Cluster (V.5.6). In recent
times, we see more and more Exceptions happening like this:

com.esotericsoftware.kryo.KryoException: Unable to find class: com. ^
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com. ^
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)

... 13 more

or

com.esotericsoftware.kryo.KryoException: Unable to find class:
com.fasterxml.jackson.databind.node.DoubleNod
com.fasterxml.jackson.databind.node.ObjectNode
Serialization trace:
_children (com.fasterxml.jackson.databind.node.ObjectNode)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:209)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:50)
at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException:
com.fasterxml.jackson.databind.node.DoubleNod
com.fasterxml.jackson.databind.node.ObjectNode
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at 

Re: Apache Flink - Are counters reliable and accurate ?

2019-06-27 Thread Chesnay Schepler

1) None that I'm aware of.
2) You should use counters.
3) No, counters are not checkpointed, but you could store the value in 
state yourself.
4) None that I'm aware of that doesn't require modifications to the 
application logic.


How long does your job run for, and how do you access metrics?

On 27/06/2019 17:36, M Singh wrote:

Hi:

I need to collect application metrics which are counts (per unit of 
time eg: minute) for certain events.  There are two ways of doing this:


1. Create separate streams (using split stream etc) in the application 
explicitly, then aggregate the counts in a window and save them.  This 
mixes metrics collection with application logic and making the 
application logic complex.

2. Use Flink metrics framework (counter, guage, etc) to save metrics

I have a very small test with 2 events but when I run the application 
the counters are not getting saved (they show value 0) even though 
that part of the code is being executed.  I do see the numRecordsIn 
counters being updated from the source operator.  I've also tried 
incrementing the count by 10 (instead of 1) every time the function 
gets execute but still the counts remain 0.


Here is snippet of the code:

dataStream.map(new RichMapFunction() {

protected Counter counter;

public void open(Configuration parameters) {
counter = 
getRuntimeContext().getMetricGroup().addGroup("test", 
"split").counter("success");

}
@Override
public String map(String value) throws Exception {
counter.inc();
return value;
}
});


As I mentioned, I do get the success metric count but the value is 
always 0, even though the above map function was executed.


My questions are:

1. Are there any issues regarding counters being approximate ?
2. If I want to collect accurate counts, is it recommended to use 
counters or should I do it explicitly (which is making the code too 
complex) ?

3. Do counters participate in flink's failure/checkpointing/recovery ?
4. Is there any better way of collecting application metric counts ?

Thanks

Mans





Apache Flink - Are counters reliable and accurate ?

2019-06-27 Thread M Singh
Hi:
I need to collect application metrics which are counts (per unit of time eg: 
minute)  for certain events.  There are two ways of doing this:
1. Create separate streams (using split stream etc) in the application 
explicitly, then aggregate the counts in a window and save them.  This mixes 
metrics collection with application logic and making the application logic 
complex.2. Use Flink metrics framework (counter, guage, etc) to save metrics
I have a very small test with 2 events but when I run the application the 
counters are not getting saved (they show value 0) even though that part of the 
code is being executed.  I do see the numRecordsIn counters being updated from 
the source operator.  I've also tried incrementing the count by 10 (instead of 
1) every time the function gets execute but still the counts remain 0.
Here is snippet of the code:
dataStream.map(new RichMapFunction() {
            protected Counter counter;
            public void open(Configuration parameters) {                counter 
= getRuntimeContext().getMetricGroup().addGroup("test", 
"split").counter("success");            }            @Override            
public String map(String value) throws Exception {                
counter.inc();                return value;            }        });

As I mentioned, I do get the success metric count but the value is always 0, 
even though the above map function was executed.  
My questions are:
1. Are there any issues regarding counters being approximate ?2. If I want to 
collect accurate counts, is it recommended to use counters or should I do it 
explicitly (which is making the code too complex) ?3. Do counters participate 
in flink's failure/checkpointing/recovery ?4. Is there any better way of 
collecting application metric counts ?
Thanks
Mans

Re: Limit number of jobs or Job Managers

2019-06-27 Thread Pankaj Chand
Hi Haibo Sun,

It's exactly what I needed. Thank you so much!

Best,

Pankaj

On Thu, Jun 27, 2019 at 7:45 AM Haibo Sun  wrote:

> Hi,  Pankaj Chand
>
> If you're running Flink on YARN, you can do this by limiting the number of
> applications in the cluster or in the queue. As far as I know, Flink does
> not limit that.
>
> The following are the configuration items for  YARN :
> yarn.scheduler.capacity.maximum-applications
> yarn.scheduler.capacity..maximum-applications
>
> Best,
> Haibo
>
> At 2019-06-27 20:55:48, "Pankaj Chand"  wrote:
>
> Hi everyone,
>
> Is there any way (parameter or function) I can limit the number of
> concurrent jobs executing in my Flink cluster? Or alternatively, limit the
> number of concurrent Job Managers (since there has to be one Job Manager
> for every job)?
>
> Thanks!
>
> Pankaj
>
>


Re:Limit number of jobs or Job Managers

2019-06-27 Thread Haibo Sun
Hi,  Pankaj Chand 


If you're running Flink on YARN, you can do this by limiting the number of 
applications in the cluster or in the queue. As far as I know, Flink does not 
limit that.


The following are the configuration items for  YARN :
yarn.scheduler.capacity.maximum-applications
yarn.scheduler.capacity..maximum-applications


Best,
Haibo

At 2019-06-27 20:55:48, "Pankaj Chand"  wrote:

Hi everyone,


Is there any way (parameter or function) I can limit the number of concurrent 
jobs executing in my Flink cluster? Or alternatively, limit the number of 
concurrent Job Managers (since there has to be one Job Manager for every job)?


Thanks!


Pankaj

Limit number of jobs or Job Managers

2019-06-27 Thread Pankaj Chand
Hi everyone,

Is there any way (parameter or function) I can limit the number of
concurrent jobs executing in my Flink cluster? Or alternatively, limit the
number of concurrent Job Managers (since there has to be one Job Manager
for every job)?

Thanks!

Pankaj


Re: HDFS checkpoints for rocksDB state backend:

2019-06-27 Thread Andrea Spina
HI Qiu,
my jar does not contain the class
`org.apache.hadoop.hdfs.protocol.HdfsConstants*`, *but I do expect it is
contained within `flink-shaded-hadoop2-uber-1.6.4.jar` which is located in
Flink cluster libs.

Il giorno gio 27 giu 2019 alle ore 04:03 Congxian Qiu <
qcx978132...@gmail.com> ha scritto:

> Hi  Andrea
>
> As the NoClassDefFoundError, could you please verify that there exist
> `org.apache.hadoop.hdfs.protocol.HdfsConstants*` *in your jar.
> Or could you use Arthas[1] to check if there exists the class when running
> the job?
>
> [1] https://github.com/alibaba/arthas
> Best,
> Congxian
>
>
> Andrea Spina  于2019年6月27日周四 上午1:57写道:
>
>> Dear community,
>> I'm trying to use HDFS checkpoints in flink-1.6.4 with the following
>> configuration
>>
>> state.backend: rocksdb
>> state.checkpoints.dir: hdfs://
>> rbl1.stage.certilogo.radicalbit.io:8020/flink/checkpoint
>> state.savepoints.dir: hdfs://
>> rbl1.stage.certilogo.radicalbit.io:8020/flink/savepoints
>>
>> and I record the following exceptions
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *Caused by: java.io.IOException: Could not flush and close the file
>> system output stream to
>> hdfs://my.rb.biz:8020/flink/checkpoint/fd35c7145e6911e1721cd0f03656b0a8/chk-2/48502e63-cb69-4944-8561-308da2f9f26a
>> 
>> in order to obtain the stream state handleat
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:328)
>>   at
>> org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:77)
>>   at
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:826)
>>   at
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:759)
>>   at
>> org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
>> ... 7 moreCaused by: java.io.IOException: DataStreamer Exception:
>>   at
>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:562)Caused
>> by: java.lang.NoClassDefFoundError: Could not initialize class
>> org.apache.hadoop.hdfs.protocol.HdfsConstantsat
>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1318)
>>   at
>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1262)
>>   at
>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:448)*
>>
>> or
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *  at java.util.concurrent.FutureTask.run(FutureTask.java:266)at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
>> ... 7 moreCaused by: java.io.IOException: DataStreamer Exception:
>>   at
>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:562)Caused
>> by: javax.xml.parsers.FactoryConfigurationError: Provider for class
>> javax.xml.parsers.DocumentBuilderFactory cannot be createdat
>> javax.xml.parsers.FactoryFinder.findServiceProvider(FactoryFinder.java:311)
>>   at javax.xml.parsers.FactoryFinder.find(FactoryFinder.java:267)
>>   at
>> javax.xml.parsers.DocumentBuilderFactory.newInstance(DocumentBuilderFactory.java:120)
>>   at
>> org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2515)
>>   at
>> org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2492)
>>   at
>> org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2405)
>>   at org.apache.hadoop.conf.Configuration.get(Configuration.java:981)
>>   at
>> org.apache.hadoop.conf.Configuration.getTrimmed(Configuration.java:1031)
>> at
>> org.apache.hadoop.conf.Configuration.getInt(Configuration.java:1251)
>> at
>> org.apache.hadoop.hdfs.protocol.HdfsConstants.(HdfsConstants.java:76)
>>   at
>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1318)
>>   at
>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1262)
>>   at
>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:448)*
>>
>> In my lib folder I have the uber jar about hdfs as usual but I am not
>> able to let the Job checkpointing its state correctly.
>> I read also here [1] but is not helping.
>>
>> Thank you for the precious help
>>
>> [1] - https://www.cnblogs.com/chendapao/p/9170566.html
>> --
>> *Andrea Spina*
>> Head of R 

Re: Hello-world example of Flink Table API using a edited Calcite rule

2019-06-27 Thread JingsongLee
Got it, it's clear, TableStats is the important functions of ExternalCatalog. 
It is right way.

Best, JingsongLee


--
From:Felipe Gutierrez 
Send Time:2019年6月27日(星期四) 14:53
To:JingsongLee 
Cc:user 
Subject:Re: Hello-world example of Flink Table API using a edited Calcite rule

Hi JingsongLee,

Sorry for not explain very well. I am gonna try a clarification of my idea.
1 - I want to use InMemoryExternalCatalog in a way to save some statistics 
which I create by listening to a stream.
2 - Then I will have my core application using Table API to execute some 
aggregation/join.
3 - Because the application on 2 uses Table API, I am able to influence its 
plan through Calcite configuration rules. So, I am gonna use the statistics 
from 1 to change the rules dynamic on 2.

Do you think it is clear? and it is a feasible application with the current 
capabilities of Table API?
ps.: I am gonna look at the links that you mentioned. Thanks for that!

Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez

-- https://felipeogutierrez.blogspot.com

On Thu, Jun 27, 2019 at 7:23 AM JingsongLee  wrote:

Hi Felipe:

Yeah, you can use InMemoryExternalCatalog and CalciteConfig,
 but I don't quite understand what you mean.
InMemoryExternalCatalog provides methods to create, drop, and 
alter (sub-)catalogs or tables. And CalciteConfig is for defining a
 custom Calcite configuration. They are two separate things.

About InMemoryExternalCatalog, You can take a look at [1]. 
Csv has been renamed to OldCsv [2], But recommendation 
using the RFC-compliant `Csv` format in the dedicated
 `flink-formats/flink-csv` module instead.


[1] 
https://github.com/apache/flink/blob/release-1.8/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala
[2] 
https://github.com/apache/flink/blob/release-1.8/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/OldCsv.scala

Best, JingsongLee

--
From:Felipe Gutierrez 
Send Time:2019年6月26日(星期三) 20:58
To:JingsongLee 
Cc:user 
Subject:Re: Hello-world example of Flink Table API using a edited Calcite rule

Hi JingsongLee,

it is still not very clear to me. I imagine that I can create an 
InMemoryExternalCatalog and insert some tuples there (which will be in memory). 
Then I can use Calcite to use the values of my InMemoryExternalCatalog and 
change my plan. Is that correct?

Do you have an example of how to create an InMemoryExternalCatalog using Flink 
1.8? Because I guess the Csv [1] class does not exist anymore.

[1] 
https://github.com/srapisarda/stypes-flink/blob/feature/sql/src/test/scala/uk/ac/bbk/dcs/stypes/flink/CalciteEmptyConsistencyTest.scala#L73

Kind Regards,
Felipe


--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez

-- https://felipeogutierrez.blogspot.com

On Wed, Jun 26, 2019 at 1:36 PM JingsongLee  wrote:
Hi Felipe:
I think your approach is absolutely right. You can try to do some plan test 
just like [1].
You can find more CalciteConfigBuilder API test in [2].

1.https://github.com/apache/flink/blob/release-1.8/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala#L168
2.https://github.com/apache/flink/blob/release-1.8/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/calcite/CalciteConfigBuilderTest.scala

Best, JingsongLee

--
From:Felipe Gutierrez 
Send Time:2019年6月26日(星期三) 18:04
To:user 
Subject:Hello-world example of Flink Table API using a edited Calcite rule

Hi,

does someone have a simple example using Table API and a Calcite rule which 
change/optimize the query execution plan of a query in Flink?

From the official documentation, I know that I have to create a CalciteConfig 
object [1]. Then, I based my firsts tests on this stackoverflow post [2] and I 
implemented this piece of code:

 // change the current calcite config plan
 CalciteConfigBuilder ccb = new CalciteConfigBuilder();
 RuleSet ruleSets = RuleSets.ofList(FilterMergeRule.INSTANCE);
 ccb.addLogicalOptRuleSet(ruleSets);
 TableConfig tableConfig = new TableConfig();
 tableConfig.setCalciteConfig(ccb.build());

I suppose that with this I can change the query plan of the Flink Table API. I 
am also not sure if I will need to use an external catalog like this post 
assumes to use [3].
In a nutshell, I would like to have a simple example where I can execute a 
query using Flink Table API and change its query execution plan using a Calcite 
rule. Does anyone have a Hello world of it? I plan to use it on this example 
[4].

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/common.html#query-optimization
[2] 
https://stackoverflow.com/questions/54278508/how-can-i-create-an-external-catalog-table-in-apache-flink
[3] 

Re: Hello-world example of Flink Table API using a edited Calcite rule

2019-06-27 Thread Felipe Gutierrez
Hi JingsongLee,

Sorry for not explain very well. I am gonna try a clarification of my idea.
1 - I want to use InMemoryExternalCatalog in a way to save some statistics
which I create by listening to a stream.
2 - Then I will have my core application using Table API to execute some
aggregation/join.
3 - Because the application on 2 uses Table API, I am able to influence its
plan through Calcite configuration rules. So, I am gonna use the statistics
from 1 to change the rules dynamic on 2.

Do you think it is clear? and it is a feasible application with the current
capabilities of Table API?
ps.: I am gonna look at the links that you mentioned. Thanks for that!

Felipe

*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


On Thu, Jun 27, 2019 at 7:23 AM JingsongLee  wrote:

> Hi Felipe:
>
> Yeah, you can use InMemoryExternalCatalog and CalciteConfig,
> but I don't quite understand what you mean.
> InMemoryExternalCatalog provides methods to create, drop, and
> alter (sub-)catalogs or tables. And CalciteConfig is for defining a
>  custom Calcite configuration. They are two separate things.
>
> About InMemoryExternalCatalog, You can take a look at [1].
> Csv has been renamed to OldCsv [2], But recommendation
> using the RFC-compliant `Csv` format in the dedicated
> `flink-formats/flink-csv` module instead.
>
>
> [1]
> https://github.com/apache/flink/blob/release-1.8/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala
> [2]
> https://github.com/apache/flink/blob/release-1.8/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/OldCsv.scala
>
> Best, JingsongLee
>
> --
> From:Felipe Gutierrez 
> Send Time:2019年6月26日(星期三) 20:58
> To:JingsongLee 
> Cc:user 
> Subject:Re: Hello-world example of Flink Table API using a edited Calcite
> rule
>
> Hi JingsongLee,
>
> it is still not very clear to me. I imagine that I can create an
> InMemoryExternalCatalog and insert some tuples there (which will be in
> memory). Then I can use Calcite to use the values of my
> InMemoryExternalCatalog and change my plan. Is that correct?
>
> Do you have an example of how to create an InMemoryExternalCatalog using
> Flink 1.8? Because I guess the Csv [1] class does not exist anymore.
>
> [1]
> https://github.com/srapisarda/stypes-flink/blob/feature/sql/src/test/scala/uk/ac/bbk/dcs/stypes/flink/CalciteEmptyConsistencyTest.scala#L73
>
> Kind Regards,
> Felipe
>
>
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>
>
> On Wed, Jun 26, 2019 at 1:36 PM JingsongLee 
> wrote:
> Hi Felipe:
> I think your approach is absolutely right. You can try to do some plan
> test just like [1].
> You can find more CalciteConfigBuilder API test in [2].
>
> 1.
> https://github.com/apache/flink/blob/release-1.8/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala#L168
> 2.
> https://github.com/apache/flink/blob/release-1.8/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/calcite/CalciteConfigBuilderTest.scala
>
> Best, JingsongLee
>
> --
> From:Felipe Gutierrez 
> Send Time:2019年6月26日(星期三) 18:04
> To:user 
> Subject:Hello-world example of Flink Table API using a edited Calcite rule
>
> Hi,
>
> does someone have a simple example using Table API and a Calcite rule
> which change/optimize the query execution plan of a query in Flink?
>
> From the official documentation, I know that I have to create a
> CalciteConfig object [1]. Then, I based my firsts tests on this
> stackoverflow post [2] and I implemented this piece of code:
>
> // change the current calcite config plan
> CalciteConfigBuilder ccb = new CalciteConfigBuilder();
> RuleSet ruleSets = RuleSets.ofList(FilterMergeRule.INSTANCE);
> ccb.addLogicalOptRuleSet(ruleSets);
> TableConfig tableConfig = new TableConfig();
> tableConfig.setCalciteConfig(ccb.build());
>
> I suppose that with this I can change the query plan of the Flink Table
> API. I am also not sure if I will need to use an external catalog like this
> post assumes to use [3].
> In a nutshell, I would like to have a simple example where I can execute a
> query using Flink Table API and change its query execution plan using a
> Calcite rule. Does anyone have a Hello world of it? I plan to use it on
> this example [4].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/common.html#query-optimization
> [2]
> https://stackoverflow.com/questions/54278508/how-can-i-create-an-external-catalog-table-in-apache-flink
> [3]
> https://stackoverflow.com/questions/54278508/how-can-i-create-an-external-catalog-table-in-apache-flink
> [4]
> 

Re: Batch mode with Flink 1.8 unstable?

2019-06-27 Thread Biao Liu
Hi Ken again,

In regard to TimeoutException, I just realized that there is no
akka.remote.OversizedPayloadException in your log file. There might be some
other reason caused this.
1. Have you ever tried increasing the configuration "akka.ask.timeout"?
2. Have you ever checked the garbage collection of JM/TM? Maybe you need to
enable printing GC log first.


Biao Liu  于2019年6月27日周四 上午11:38写道:

> Hi Ken,
>
> In regard to oversized input splits, it seems to be a rare case beyond my
> expectation. However it should be fixed definitely since input split can be
> user-defined. We should not assume it must be small.
> I agree with Stephan that maybe there is something unexpectedly involved
> in the input splits.
> And there is also a work-around way to solve this before we fixing it,
> increasing the limit of RPC message size by explicitly configuring
> "akka.framesize" in flink-conf.yaml.
>
> To @Qi, also sorry to hear your bad experience. I'll take this issue but
> I'm not sure I could catch up the releasing of 1.9. Hope things go well.
>
>
> Stephan Ewen  于2019年6月26日周三 下午10:50写道:
>
>> Hi Ken!
>>
>> Sorry to hear you are going through this experience. The major focus on
>> streaming so far means that the DataSet API has stability issues at scale.
>> So, yes, batch mode in current Flink version can be somewhat tricky.
>>
>> It is a big focus of Flink 1.9 to fix the batch mode, finally, and by
>> addressing batch specific scheduling / recovery / and shuffle issues.
>>
>> Let me go through the issues you found:
>>
>> *(1) Input splits and oversized RPC*
>>
>> Your explanation seems correct, timeout due to dropping oversized RPC
>> message.
>>
>> I don't quite understand how that exactly happens, because the size limit
>> is 10 MB and input splits should be rather small in most cases.
>> Are you running custom sources which put large data into splits? Maybe
>> accidentally, by having a large serialized closure in the splits?
>>
>> The fix would be this issue:
>> https://issues.apache.org/jira/browse/FLINK-4399
>>
>> *(2) TM early release*
>>
>> The 1.8 version had a fix that should work for regular cases without
>> fine-grained failure recovery.
>> 1.9 should have a more general fix that also works for fine-grained
>> recovery
>>
>> Are you trying to use the finer grained failover with the batch job?
>> The finer-grained failover is not working in batch for 1.8, that is why
>> it is not an advertised feature (it only works for streaming so far).
>>
>> The goal is that this works in the 1.9 release (aka the batch fixup
>> release)
>>
>> (3) Hang in Processing
>>
>> I think a thread dump (jstack) from the TMs would be helpful to diagnose
>> that.
>> There are known issues with the current batch shuffle implementation,
>> which is why 1.9 is getting a new bounded-blocking stream shuffle
>> implementation.
>>
>> Best,
>> Stephan
>>
>>
>>
>>
>>
>>
>> On Mon, Jun 24, 2019 at 2:32 AM Ken Krugler 
>> wrote:
>>
>>> Hi all,
>>>
>>> I’ve been running a somewhat complex batch job (in EMR/YARN) with Flink
>>> 1.8.0, and it regularly fails, but for varying reasons.
>>>
>>> Has anyone else had stability with 1.8.0 in batch mode and non-trivial
>>> workflows?
>>>
>>> Thanks,
>>>
>>> — Ken
>>>
>>> *1. TimeoutException getting input splits*
>>>
>>> The batch job starts by processing a lot of files that live in S3.
>>> During this phase, I sometimes see:
>>>
>>> 2019-06-20 01:20:22,659 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN
>>> DataSource (at createInput(ExecutionEnvironment.java:549)
>>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> Map (ad
>>> dailies) -> Filter (Filter at
>>> createWorkflow(AdvertiserSimilarityWorkflow.java:34)) -> Filter (Filter at
>>> createWorkflow(AdvertiserSimilarityWorkflow.java:36)) -> Filter (Filter at
>>> createWorkflow(AdvertiserSimilarityWorkflow.java:38)) -> Map (Key
>>> Extractor) -> Combine (Reduce at
>>> createWorkflow(AdvertiserSimilarityWorkflow.java:41)) (31/32)
>>> (8a8cbea47394c3d638910c36ac62d877) switched from RUNNING to FAILED.
>>> java.lang.RuntimeException: Could not retrieve next input split.
>>> at
>>> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
>>> at
>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by:
>>> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException:
>>> Requesting the next input split failed.
>>> at
>>> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
>>> at
>>> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
>>> ... 3 more
>>> Caused by: java.util.concurrent.TimeoutException
>>> at
>>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>>> at
>>>