Re: Failed to build the newest flink

2018-08-30 Thread zhangmingleihe
Hi, Vino.

I tried the 1.6.0 tag source code, it can not be compiled then.

> 在 2018年8月31日,上午11:47,vino yang  写道:
> 
> Hi minglei,
> 
> It’s strange, if the latest PR of the Flink repository can pass the test, it 
> should be able to show that Flink's current source code should be no problem.
> 
> Then as a comparison, you can check out the source code of flink 1.6.0 tag to 
> see if it can be compiled.
> 
> Thanks, vino.
> 
> zhangmingleihe <18717838...@163.com > 
> 于2018年8月31日周五 上午11:33写道:
> Hi, Vino.
> 
> Thanks. I can access the repo. I tried the method what you said. But I still 
> can not pass it.
> 
>> 在 2018年8月31日,上午11:08,vino yang > > 写道:
>> 
>> Hi minglei,
>> 
>> Flink-mapr-fs requires a separate repository configuration:
>> 
>> 
>>
>>   mapr-releases
>>   http://repository.mapr.com/maven/ 
>> 
>>   false
>>   true
>>
>> 
>> 
>> You need to confirm if you can access it normally. As far as I know, some 
>> companies' own maven private services do not allow access to third-party 
>> warehouses.
>> If you confirm that you have access, then you can delete the relevant 
>> dependencies and have it re-download.
>> 
>> Thanks, vino.
>> 
>> zhangmingleihe <18717838...@163.com > 
>> 于2018年8月31日周五 上午10:23写道:
>> Hi, 
>> 
>> When I build flink with the newest code, I found the below error. Never met 
>> this error before. Why ? Since I can not download the 
>> 
>> [INFO] BUILD FAILURE
>> [INFO] 
>> 
>> [INFO] Total time: 01:48 min
>> [INFO] Finished at: 2018-08-31T10:15:24+08:00
>> [INFO] 
>> 
>> [ERROR] Failed to execute goal 
>> org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) 
>> on project flink-mapr-fs: Compilation failure: Compilation failure:
>> [ERROR] 
>> /Users/zhangminglei/project/flink/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java:[70,44]
>>  package org.apache.hadoop.fs does not esixt.
>> [ERROR] 
>> /Users/zhangminglei/project/flink/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java:[73,45]
>>  symbol not found
>> [ERROR]   符号:   类 Configuration
>> [ERROR]   位置: 程序包 org.apache.hadoop.conf
>> [ERROR] 
>> /Users/zhangminglei/project/flink/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java:[73,93]
>>  找不到符号
>> [ERROR]   符号:   类 Configuration
>> [ERROR]   位置: 程序包 org.apache.hadoop.conf
>> [ERROR] -> [Help 1]
>> [ERROR]
>> 
>> Cheers
>> Minglei
> 



Table API: get Schema from TableSchema, TypeInformation[] or Avro

2018-08-30 Thread françois lacombe
Hi all,

Today I'm looking into derivating an Avro schema json string into a Schema
object.
In the overview of
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/connect.html
Avro is used as a format and never as a schema.

This was a topic in JIRA-9813
I can get a TableSchema with TableSchema schema =
TableSchema.fromTypeInfo(AvroSchemaConverter.convertToTypeInfo(sch_csv.toString()));
but I can't use it with BatchTableDescriptor.withSchema().

How can I get a Schema from TableSchema, TypeInformation[] or even Avro
json string?
A little bridge is missing between TableSchema and
org.apache.flink.table.descriptors.Schema it seems.

Thanks in advance for any useful hint

François


Re: Configuring Ports for Job/Task Manager Metrics

2018-08-30 Thread Deirdre Kong
Hi Vino/Chesnay,

Thank you for the info.

I am actually using Yarn for deployment.  Flink is installed in AWS EMR, so
sometimes jm and tm processes are deployed in the same container, sometimes
they are deployed in different containers.  I would need to configure
Prometheus to listen on a specify port for  JM and TM reporter.  So even if
JM is bounced, I can get the JM metrics on the same port each time.

Can you elaborate what do you mean by specify a separate Flink installation
package for it?

Chesnay, do you have any insights on this?

Thanks,
Deirdre

On Thu, Aug 30, 2018 at 7:18 PM vino yang  wrote:

> Hi Deirdre,
>
> If you run multiple Flink component (jm/tm) processes on one physical
> node, it is recommended to specify the port range to avoid conflicts[1], I
> guess this is based on the same Flink binary installation package.
> If you want to always have the TM reporter running on the same port, you
> can specify a separate Flink installation package for it, explicitly
> specifying this port in the configuration file for this installation
> package.
> However, you still need to pay attention to port conflicts with other
> components.
>
> The issue you provided is handled by Chesnay, so maybe Chesnay opinion is
> more appropriate. Ping Chesnay for you.
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html#prometheus-orgapacheflinkmetricsprometheusprometheusreporter
>
> Thanks, vino.
>
> Deirdre Kong  于2018年8月31日周五 上午5:25写道:
>
>> Hi,
>>
>> I am running Flink in Amazon EMR.  In flink-conf.yaml, I have 
>> `metrics.reporter.prom.port:
>> 9249-9250`
>> Depending whether the job manager and task manager are running in the
>> same node, the task manager metrics are reported on port 9250 (if running
>> on same node as job manager), or on port 9249 (if running on a different
>> node).
>>
>> Is there a way to configure so that the task manager metrics are always
>> reported on port 9250?
>> I saw a post  that we
>> can "provide each *Manager with a separate configuration."  How to do that?
>>
>> Thanks
>>
>


Re: Failed to build the newest flink

2018-08-30 Thread vino yang
Hi minglei,

It’s strange, if the latest PR of the Flink repository can pass the test,
it should be able to show that Flink's current source code should be no
problem.

Then as a comparison, you can check out the source code of flink 1.6.0 tag
to see if it can be compiled.

Thanks, vino.

zhangmingleihe <18717838...@163.com> 于2018年8月31日周五 上午11:33写道:

> Hi, Vino.
>
> Thanks. I can access the repo. I tried the method what you said. But I
> still can not pass it.
>
> 在 2018年8月31日,上午11:08,vino yang  写道:
>
> Hi minglei,
>
> Flink-mapr-fs requires a separate repository configuration:
>
> 
>
>   mapr-releases
>   http://repository.mapr.com/maven/
>   false
>   true
>
> 
>
>
> You need to confirm if you can access it normally. As far as I know, some
> companies' own maven private services do not allow access to third-party
> warehouses.
> If you confirm that you have access, then you can delete the relevant
> dependencies and have it re-download.
>
> Thanks, vino.
>
> zhangmingleihe <18717838...@163.com> 于2018年8月31日周五 上午10:23写道:
>
>> Hi,
>>
>> When I build flink with the newest code, I found the below error. Never
>> met this error before. Why ? Since I can not download the
>>
>> [INFO] BUILD FAILURE
>> [INFO]
>> 
>> [INFO] Total time: 01:48 min
>> [INFO] Finished at: 2018-08-31T10:15:24+08:00
>> [INFO]
>> 
>> [ERROR] Failed to execute goal
>> org.apache.maven.plugins:maven-compiler-plugin:3.1:compile
>> (default-compile) on project flink-mapr-fs: Compilation failure:
>> Compilation failure:
>> [ERROR]
>> /Users/zhangminglei/project/flink/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java:[70,44]
>> package org.apache.hadoop.fs does not esixt.
>> [ERROR]
>> /Users/zhangminglei/project/flink/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java:[73,45]
>> symbol not found
>> [ERROR]   符号:   类 Configuration
>> [ERROR]   位置: 程序包 org.apache.hadoop.conf
>> [ERROR]
>> /Users/zhangminglei/project/flink/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java:[73,93]
>> 找不到符号
>> [ERROR]   符号:   类 Configuration
>> [ERROR]   位置: 程序包 org.apache.hadoop.conf
>> [ERROR] -> [Help 1]
>> [ERROR]
>>
>> Cheers
>> Minglei
>>
>
>


Re: Failed to build the newest flink

2018-08-30 Thread zhangmingleihe
Hi, Vino.

Thanks. I can access the repo. I tried the method what you said. But I still 
can not pass it.

> 在 2018年8月31日,上午11:08,vino yang  写道:
> 
> Hi minglei,
> 
> Flink-mapr-fs requires a separate repository configuration:
> 
> 
>
>   mapr-releases
>   http://repository.mapr.com/maven/ 
> 
>   false
>   true
>
> 
> 
> You need to confirm if you can access it normally. As far as I know, some 
> companies' own maven private services do not allow access to third-party 
> warehouses.
> If you confirm that you have access, then you can delete the relevant 
> dependencies and have it re-download.
> 
> Thanks, vino.
> 
> zhangmingleihe <18717838...@163.com > 
> 于2018年8月31日周五 上午10:23写道:
> Hi, 
> 
> When I build flink with the newest code, I found the below error. Never met 
> this error before. Why ? Since I can not download the 
> 
> [INFO] BUILD FAILURE
> [INFO] 
> 
> [INFO] Total time: 01:48 min
> [INFO] Finished at: 2018-08-31T10:15:24+08:00
> [INFO] 
> 
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) 
> on project flink-mapr-fs: Compilation failure: Compilation failure:
> [ERROR] 
> /Users/zhangminglei/project/flink/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java:[70,44]
>  package org.apache.hadoop.fs does not esixt.
> [ERROR] 
> /Users/zhangminglei/project/flink/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java:[73,45]
>  symbol not found
> [ERROR]   符号:   类 Configuration
> [ERROR]   位置: 程序包 org.apache.hadoop.conf
> [ERROR] 
> /Users/zhangminglei/project/flink/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java:[73,93]
>  找不到符号
> [ERROR]   符号:   类 Configuration
> [ERROR]   位置: 程序包 org.apache.hadoop.conf
> [ERROR] -> [Help 1]
> [ERROR]
> 
> Cheers
> Minglei



Re: Failed to build the newest flink

2018-08-30 Thread vino yang
Hi minglei,

Flink-mapr-fs requires a separate repository configuration:


   
  mapr-releases
  http://repository.mapr.com/maven/
  false
  true
   



You need to confirm if you can access it normally. As far as I know, some
companies' own maven private services do not allow access to third-party
warehouses.
If you confirm that you have access, then you can delete the relevant
dependencies and have it re-download.

Thanks, vino.

zhangmingleihe <18717838...@163.com> 于2018年8月31日周五 上午10:23写道:

> Hi,
>
> When I build flink with the newest code, I found the below error. Never
> met this error before. Why ? Since I can not download the
>
> [INFO] BUILD FAILURE
> [INFO]
> 
> [INFO] Total time: 01:48 min
> [INFO] Finished at: 2018-08-31T10:15:24+08:00
> [INFO]
> 
> [ERROR] Failed to execute goal
> org.apache.maven.plugins:maven-compiler-plugin:3.1:compile
> (default-compile) on project flink-mapr-fs: Compilation failure:
> Compilation failure:
> [ERROR]
> /Users/zhangminglei/project/flink/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java:[70,44]
> package org.apache.hadoop.fs does not esixt.
> [ERROR]
> /Users/zhangminglei/project/flink/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java:[73,45]
> symbol not found
> [ERROR]   符号:   类 Configuration
> [ERROR]   位置: 程序包 org.apache.hadoop.conf
> [ERROR]
> /Users/zhangminglei/project/flink/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java:[73,93]
> 找不到符号
> [ERROR]   符号:   类 Configuration
> [ERROR]   位置: 程序包 org.apache.hadoop.conf
> [ERROR] -> [Help 1]
> [ERROR]
>
> Cheers
> Minglei
>


Re: CompletedCheckpoints are getting Stale ( Flink 1.4.2 )

2018-08-30 Thread vino yang
Hi Laura,

First of all, Flink only keeps one completed checkpoint by default[1]. You
need to confirm whether your configuration is consistent with the number of
files. If they are consistent, it is for other reasons:

1) The cleaning of the completed checkpoint is done by JM. You need to
confirm whether it can access your file.[2]
2) JM will asynchronously clean up the metadata of the old completed
checkpoint on the ZK with a background thread. After the cleanup is
successful, it will clean the Checkpoint data. If the above reasons are
excluded, then maybe you provide JM's log to help us confirm whether this
is the reason. I think it is more appropriate to ping Till.[3]

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/checkpointing.html#state-checkpoints-num-retained
[2]:
https://stackoverflow.com/questions/44928624/apache-flink-not-deleting-old-checkpoints
[3]:
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java#L437

Thanks, vino.

Laura Uzcátegui  于2018年8月30日周四 下午10:52写道:

> Hello,
>
>  At work, we are currently standing up a cluster with the following
> configuration:
>
>
>- Flink version: 1.4.2
>- HA Enabled with Zookeeper
>- State backend : rocksDB
>- state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
>- state.backend.rocksdb.checkpointdir:
>hdfs://namenode:9000/flink/checkpoints
>- *high-availability.storageDir*: hdfs://namenode:9000/flink/recovery
>
> We have also a job running with checkpointing enabled and without
> externalized checkpoint.
>
> We run this job multiple times a day since it's run from our
> integration-test pipeline, and we started noticing the folder
> *high-availability.storageDir  *storing the completedCheckpoint files is
> increasing constantly the number of files created, which is making us
> wonder if there is no cleanup policy for the Filesystem when HA is enabled.
>
> Under what  circumstance would there be an ever increasing number of
> completedCheckpoint files on the HA storage dir when there is only a single
> job running over and over again ?
>
> Here is a list of what we are seeing accumulating over time and actually
> reaching the maximum of files allowed on the Filesystem.
>
> completedCheckpoint00d86c01d8b9
> completedCheckpoint00d86e9030a9
> completedCheckpoint00d877b74355
> completedCheckpoint00d87b3dd9ad
> completedCheckpoint00d8815d9afd
> completedCheckpoint00d88973195c
> completedCheckpoint00d88b4792f2
> completedCheckpoint00d890d499dc
> completedCheckpoint00d91b00ada2
>
>
> Cheers,
>
>
> Laura U.
>
>


Failed to build the newest flink

2018-08-30 Thread zhangmingleihe
Hi, 

When I build flink with the newest code, I found the below error. Never met 
this error before. Why ? Since I can not download the 

[INFO] BUILD FAILURE
[INFO] 
[INFO] Total time: 01:48 min
[INFO] Finished at: 2018-08-31T10:15:24+08:00
[INFO] 
[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on 
project flink-mapr-fs: Compilation failure: Compilation failure:
[ERROR] 
/Users/zhangminglei/project/flink/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java:[70,44]
 package org.apache.hadoop.fs does not esixt.
[ERROR] 
/Users/zhangminglei/project/flink/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java:[73,45]
 symbol not found
[ERROR]   符号:   类 Configuration
[ERROR]   位置: 程序包 org.apache.hadoop.conf
[ERROR] 
/Users/zhangminglei/project/flink/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java:[73,93]
 找不到符号
[ERROR]   符号:   类 Configuration
[ERROR]   位置: 程序包 org.apache.hadoop.conf
[ERROR] -> [Help 1]
[ERROR]

Cheers
Minglei


Re: Configuring Ports for Job/Task Manager Metrics

2018-08-30 Thread vino yang
Hi Deirdre,

If you run multiple Flink component (jm/tm) processes on one physical node,
it is recommended to specify the port range to avoid conflicts[1], I guess
this is based on the same Flink binary installation package.
If you want to always have the TM reporter running on the same port, you
can specify a separate Flink installation package for it, explicitly
specifying this port in the configuration file for this installation
package.
However, you still need to pay attention to port conflicts with other
components.

The issue you provided is handled by Chesnay, so maybe Chesnay opinion is
more appropriate. Ping Chesnay for you.

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html#prometheus-orgapacheflinkmetricsprometheusprometheusreporter

Thanks, vino.

Deirdre Kong  于2018年8月31日周五 上午5:25写道:

> Hi,
>
> I am running Flink in Amazon EMR.  In flink-conf.yaml, I have 
> `metrics.reporter.prom.port:
> 9249-9250`
> Depending whether the job manager and task manager are running in the same
> node, the task manager metrics are reported on port 9250 (if running on
> same node as job manager), or on port 9249 (if running on a different node).
>
> Is there a way to configure so that the task manager metrics are always
> reported on port 9250?
> I saw a post  that we
> can "provide each *Manager with a separate configuration."  How to do that?
>
> Thanks
>


Re: test windows

2018-08-30 Thread Hequn Cheng
Hi Nicos,
Do you want an IT test to test the job, or just a UT to test the window
operator?

If you want to test the job, there are examples here:
1. WordCountITCase.java

2. WindowFunctionITCase.scala


Also, there are UT tests using test harness to test a window operator:
1. WindowOperatorTest.java


Best, Hequn

On Thu, Aug 30, 2018 at 9:04 PM Nicos Maris  wrote:

> Can the OneInputStreamOperatorTestHarness
> 
> be used to test pipelines with time windows?
>
> Can someone explain this test harness preferably with a tiny example?
>
> I see here
> 
> that "it is currently not a part of public API and can be subject to
> change", but I assume it is the only thing I can get from flink as regards
> automated testing of pipelines with time windows.
>
> On Tue, Aug 28, 2018 at 3:12 PM Nicos Maris  wrote:
>
>> Hi Zhengwen,
>>
>>
>> I have tested my job manually (both by submitting a job and through
>> execute()) and I am trying to write a test.
>>
>> The following project states that it has the feature "Test stream
>> windowing with timestamped input" but I do not want to rely on a project
>> other than flink.
>>
>> https://github.com/ottogroup/flink-spector
>>
>> The following folder doesn't seam to demonstrate such capability for
>> testing.
>>
>>
>> https://github.com/apache/flink/tree/master/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test
>>
>>
>>
>> On Tue, Aug 28, 2018 at 2:55 PM ZhengWen ZHU  wrote:
>>
>>> Hi Nicos,
>>> Under the flink-example module, there are many examples, including batch
>>> and streaming.  You could build the project from the source, this way you
>>> could found many jars under the target directory. You can submit these jars
>>> to the Flink cluster. Also, you could run these examples directly from your
>>> IDE by standalone mode.
>>>
>>> best,
>>> Zhengwen
>>>
>>> On Tue, Aug 28, 2018 at 7:35 PM Nicos Maris 
>>> wrote:
>>>
 Hi all,


 How can I test in Java any streaming job that has a time window?


 best,
 Nicos

>>>
>>>
>>> --
>>>
>>>
>>>
>>>
>>> *_Zhu ZhengwenState Key
>>> Laboratory For Novel Software TechnologyDept. of Computer Sci. & Tech.,
>>> Nanjing University*
>>> *9, Hankou Road, Nanjing, China.*
>>>
>>


Configuring Ports for Job/Task Manager Metrics

2018-08-30 Thread Deirdre Kong
Hi,

I am running Flink in Amazon EMR.  In flink-conf.yaml, I have
`metrics.reporter.prom.port:
9249-9250`
Depending whether the job manager and task manager are running in the same
node, the task manager metrics are reported on port 9250 (if running on
same node as job manager), or on port 9249 (if running on a different node).

Is there a way to configure so that the task manager metrics are always
reported on port 9250?
I saw a post  that we can
"provide each *Manager with a separate configuration."  How to do that?

Thanks


Re: Problem with querying state on Flink 1.6.

2018-08-30 Thread Eron Wright
I took a brief look as to why the queryable state server would bind to the
loopback address.   Both the qs server and the
org.apache.flink.runtime.io.network.netty.NettyServer do bind the local
address based on the TM address.  That address is based on the
"taskmanager.hostname" configuration override and, by default, the
RpcService address.

A possible explanation is that, on Joe's machine, Java's
`InetAddress.getLocalHost()` resolves to the loopback address.  I believe
there's some variation in Java's behavior in that regard.

Hope this helps!

On Thu, Aug 30, 2018 at 1:27 AM Till Rohrmann  wrote:

> Hi Joe,
>
> it looks as if the queryable state server binds to the local loopback
> address. This looks like a bug to me. Could you maybe share the complete
> cluster entrypoint and the task manager logs with me?
>
> In the meantime you could try to do the following: Change
> AbstractServerBase.java:227 into `.localAddress(port)`. This should bind to
> any local address. Now you need to build your own Flink distribution by
> running `mvn clean package -DskipTests` and then go to either build-target
> or flink-dist/target/flink-1.7-SNAPSHOT-bin/flink-1.7-SNAPSHOT to find the
> distribution.
>
> Cheers,
> Till
>
> On Thu, Aug 30, 2018 at 12:12 AM Joe Olson  wrote:
>
>> I'm having a problem with querying state on Flink 1.6.
>>
>> I put a project in Github that is my best representation of the very
>> simple client example outlined in the 'querying state' section of the 1.6
>> documentation at
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html
>> . The Github project is at https://github.com/jolson787/qs
>>
>> My problem: I know the query server and proxy server have started on my 1
>> job manager / 1 task manager Flink 1.6 test rig, because I see the 'Started
>> Queryable State Server' and 'Started Queryable State Proxy Server' in the
>> task manager logs. I know the ports are open on the local machine, because
>> I can telnet to them.
>>
>> From a remote machine, I implemented the QueryableStateClient as in the
>> example, and made a getKVState call. Nothing I seem to do between that or
>> the getKVstate call seems to register...no response, no errors thrown, no
>> lines in the log, no returned futures, no timeouts, etc. I know the proxy
>> server and state server ports are NOT open to the remote machine, yet the
>> client still doesn't seem to react.
>>
>> Can someone take a quick look at my very simple Github project and see if
>> anything jumps out at them? Beer is on me at Flink Forward if someone can
>> help me work through this
>>
>


Re: withFormat(Csv) is undefined for the type BatchTableEnvironment

2018-08-30 Thread françois lacombe
Hi

It's all good, I've misunderstood some points in the example codes.
All is working fine with BatchTableDescriptor

All the best

François

2018-08-30 11:40 GMT+02:00 Timo Walther :

> Hi François,
>
> you should read the documentation from top to bottom. The overview part
> [1] explains how everything plays together with examples.
>
> Regards,
> Timo
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/table/connect.html#overview
>
> Am 30.08.18 um 10:41 schrieb Till Rohrmann:
>
> Hi François,
>
> as Vino said, the BatchTableEnvironment does not provide a `withFormat`
> method. Admittedly, the documentation does not state it too explicitly but
> you can only call the `withFormat` method on a table connector as indicated
> here [1]. If you think that you need to get the data from somewhere first
> before defining a format, then it becomes clear that you first need to
> define a connector.
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.6/dev/table/connect.html#table-formats
>
> Cheers,
> Till
>
> On Thu, Aug 30, 2018 at 4:46 AM vino yang  wrote:
>
>> Hi francois,
>>
>> Maybe you can refer to the comments of this source code?[1]
>>
>> https://github.com/apache/flink/blob/master/flink-
>> libraries/flink-table/src/main/scala/org/apache/flink/table/api/
>> BatchTableEnvironment.scala#L143
>>
>> Thanks, vino.
>>
>> françois lacombe  于2018年8月29日周三 下午10:54写道:
>>
>>> Hi Vino,
>>>
>>> Thanks for this answer.
>>> I can't find in the docs where it's about BatchTableDescriptor
>>> https://ci.apache.org/projects/flink/flink-docs-
>>> release-1.6/dev/table/connect.html#csv-format
>>>
>>> It sounds like the withFormat method is applied on TableEnvironment
>>> object on this page.
>>>
>>> All the best
>>>
>>> François
>>>
>>> 2018-08-28 4:37 GMT+02:00 vino yang :
>>>
 Hi Francois,

 Yes, the withFormat API comes from an instance of BatchTableDescriptor,
 and the BatchTableDescriptor instance is returned by the connect API, so
 you should call BatchTableEnvironment#connect first.

 Thanks, vino.

 françois lacombe  于2018年8月27日周一
 下午10:26写道:

> Hi all,
>
> I'm currently trying to load a CSV file content with Flink 1.6.0 table
> API.
> This error is raised as a try to execute the code written in docs
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.6/dev/table/connect.html#csv-format
>
> ExecutionEnvironment env = ExecutionEnvironment.
> getExecutionEnvironment();
> BatchTableEnvironment tEnv = TableEnvironment.
> getTableEnvironment(env);
> tEnv.withFormat(new Csv(...));
>
> > Exception in thread "main" java.lang.Error: Unresolved compilation
> problem:
>The method withFormat(Csv) is undefined for the type
> BatchTableEnvironment
>
> Am I wrong?
>
> Thanks in advance for any hint
>
> François
>

>>>
>


CompletedCheckpoints are getting Stale ( Flink 1.4.2 )

2018-08-30 Thread Laura Uzcátegui
Hello,

 At work, we are currently standing up a cluster with the following
configuration:


   - Flink version: 1.4.2
   - HA Enabled with Zookeeper
   - State backend : rocksDB
   - state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
   - state.backend.rocksdb.checkpointdir:
   hdfs://namenode:9000/flink/checkpoints
   - *high-availability.storageDir*: hdfs://namenode:9000/flink/recovery

We have also a job running with checkpointing enabled and without
externalized checkpoint.

We run this job multiple times a day since it's run from our
integration-test pipeline, and we started noticing the folder
*high-availability.storageDir  *storing the completedCheckpoint files is
increasing constantly the number of files created, which is making us
wonder if there is no cleanup policy for the Filesystem when HA is enabled.

Under what  circumstance would there be an ever increasing number of
completedCheckpoint files on the HA storage dir when there is only a single
job running over and over again ?

Here is a list of what we are seeing accumulating over time and actually
reaching the maximum of files allowed on the Filesystem.

completedCheckpoint00d86c01d8b9
completedCheckpoint00d86e9030a9
completedCheckpoint00d877b74355
completedCheckpoint00d87b3dd9ad
completedCheckpoint00d8815d9afd
completedCheckpoint00d88973195c
completedCheckpoint00d88b4792f2
completedCheckpoint00d890d499dc
completedCheckpoint00d91b00ada2


Cheers,


Laura U.


Re: test windows

2018-08-30 Thread Nicos Maris
Can the OneInputStreamOperatorTestHarness

be used to test pipelines with time windows?

Can someone explain this test harness preferably with a tiny example?

I see here

that "it is currently not a part of public API and can be subject to
change", but I assume it is the only thing I can get from flink as regards
automated testing of pipelines with time windows.

On Tue, Aug 28, 2018 at 3:12 PM Nicos Maris  wrote:

> Hi Zhengwen,
>
>
> I have tested my job manually (both by submitting a job and through
> execute()) and I am trying to write a test.
>
> The following project states that it has the feature "Test stream
> windowing with timestamped input" but I do not want to rely on a project
> other than flink.
>
> https://github.com/ottogroup/flink-spector
>
> The following folder doesn't seam to demonstrate such capability for
> testing.
>
>
> https://github.com/apache/flink/tree/master/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test
>
>
>
> On Tue, Aug 28, 2018 at 2:55 PM ZhengWen ZHU  wrote:
>
>> Hi Nicos,
>> Under the flink-example module, there are many examples, including batch
>> and streaming.  You could build the project from the source, this way you
>> could found many jars under the target directory. You can submit these jars
>> to the Flink cluster. Also, you could run these examples directly from your
>> IDE by standalone mode.
>>
>> best,
>> Zhengwen
>>
>> On Tue, Aug 28, 2018 at 7:35 PM Nicos Maris 
>> wrote:
>>
>>> Hi all,
>>>
>>>
>>> How can I test in Java any streaming job that has a time window?
>>>
>>>
>>> best,
>>> Nicos
>>>
>>
>>
>> --
>>
>>
>>
>>
>> *_Zhu ZhengwenState Key
>> Laboratory For Novel Software TechnologyDept. of Computer Sci. & Tech.,
>> Nanjing University*
>> *9, Hankou Road, Nanjing, China.*
>>
>


Re: Grok and Flink

2018-08-30 Thread Aarti Gupta
Interesting, thanks Lehuede. Will take a look.

--Aarti

On Thu, Aug 30, 2018 at 5:59 PM, Lehuede sebastien 
wrote:

> Hi,
>
> To parse my logs and reuse all my Grok pattern, i use the Java Grok API
> directly in my DataStream. Please see : https://github.com/thekrakken/
> java-grok
>
> With that you should be able to get rid of the full Logstash piece and use
> only the Grok part.
>
> Another solution, for example if you have logs/events in CEF Format, you
> can just use 'split' in the flatmap function for example.
>
> Hope will help.
>
> Regards,
> Sebastien.
>



-- 
Aarti Gupta 
Director, Engineering, Correlation


aagu...@qualys.com
T


Qualys, Inc. – Blog  | Community
 | Twitter 





Re: Grok and Flink

2018-08-30 Thread Lehuede sebastien
 Hi,

To parse my logs and reuse all my Grok pattern, i use the Java Grok API
directly in my DataStream. Please see :
https://github.com/thekrakken/java-grok

With that you should be able to get rid of the full Logstash piece and use
only the Grok part.

Another solution, for example if you have logs/events in CEF Format, you
can just use 'split' in the flatmap function for example.

Hope will help.

Regards,
Sebastien.


What are the general reasons for a Flink Task Manager to crash? How to troubleshoot?

2018-08-30 Thread HarshithBolar
We're running Flink on a 5 node Flink cluster with two Job Managers and three
Task Managers.

Of late, we're facing this issue where once every day or so, all three task
managers get killed, making the number of available task slots 0 causing all
the jobs running on that cluster to fail. The only resolution is to manually
restart the Task Managers.

So I wanted to know some of the typical reason that can bring down a Task
Manager. And if there is a way to automatically bring them back up without
manual intervention.

Additional info: The jobs running on the cluster read data from Kafka and
write data to Kafka/Cassandra.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Grok and Flink

2018-08-30 Thread Aarti Gupta
Hi,

We are using the Grok filter in Logstash to parse and enrich our data. Grok
provides inbuilt parsing for common log sources such as Apache, this allows
us to add structure to unstructured data.

After the data has been parsed in Logstash, we then stream the data over
Kafka to Flink for further CEP processing.

We are looking to see if we can get rid of the Logstash piece and do all of
the data enrichment and parsing in Flink.

Our question - does Flink have an inbuilt library similar to Grok that
provides out of the box parsing for common log formats.

Thanks in advance,
Aarti

-- 
Aarti Gupta 
Director, Engineering, Correlation


aagu...@qualys.com
T


Qualys, Inc. – Blog  | Community
 | Twitter 





Re: WriteTimeoutException in Cassandra sink kill the job

2018-08-30 Thread Till Rohrmann
Hi Jayant,

afaik it is currently not possible to control how failures are handled in
the Cassandra Sink. What would be the desired behaviour? The best thing is
to open a JIRA issue to discuss potential improvements.

Cheers,
Till

On Thu, Aug 30, 2018 at 12:15 PM Jayant Ameta  wrote:

> Hi,
> During high volumes, cassandra sink fails with the following error:
> com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra
> timeout during write query at consistency SERIAL (2 replica were required
> but only 1 acknowledged the write)
>
> Is there a way to configure the sink to ignore/handle this error?
>
> Jayant
>


WriteTimeoutException in Cassandra sink kill the job

2018-08-30 Thread Jayant Ameta
Hi,
During high volumes, cassandra sink fails with the following error:
com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra
timeout during write query at consistency SERIAL (2 replica were required
but only 1 acknowledged the write)

Is there a way to configure the sink to ignore/handle this error?

Jayant


Re: Using a ProcessFunction as a "Source"

2018-08-30 Thread Aljoscha Krettek
Hi Addison,

for a while now different ideas about reworking the Source interface have been 
floated. I implemented a prototype that showcases my favoured approach for such 
a new interface: 
https://github.com/aljoscha/flink/commits/refactor-source-interface 


This basically splits the Source into two parts: a SplitEnumerator and a 
SplitReader. The enumerator is responsible for discovering what should be read 
and the reader is responsible for reading splits. In this model, the 
SplitReader does not necessarily have to sit at the beginning of the pipeline, 
it could sit somewhere in the middle and the splits don't have to necessarily 
come from the enumerator but could come from a different source.

I think this could fit the use case that you're describing.

Best,
Aljoscha

> On 25. Aug 2018, at 11:45, Chesnay Schepler  wrote:
> 
> The SourceFunction interface is rather flexible so you can do pretty much 
> whatever you want. Exact implementation depends on whether control messages 
> are pulled or pushed to the source; in the first case you'd simply block 
> within "run()" on the external call, in the latter you'd have it block on a 
> queue of some sort that is fed by another thread waiting for messages.
> 
> AFAIK you should never use the collector outside of "processElement".
> 
> On 25.08.2018 05:15, vino yang wrote:
>> Hi Addison,
>> 
>> I have a lot of things I don't understand. Is your source self-generated 
>> message? Why can't source receive input? If the source is unacceptable then 
>> why is it called source? Isn't kafka-connector the input as source?
>> 
>> If you mean that under normal circumstances it can't receive another input 
>> about control messages, there are some ways to solve it.
>> 
>> 1) Access external systems in your source to get or subscribe to control 
>> messages, such as Zookeeper.
>> 2) If your source is followed by a map or process operator, then they can be 
>> chained together as a "big" source, then you can pass your control message 
>> via Flink's new feature "broadcast state". See this blog post for details.[1]
>> 3) Mix control messages with normal messages in the same message flow. After 
>> the control message is parsed, the corresponding action is taken. Of course, 
>> this kind of program is not very recommended.
>> 
>> [1]: 
>> https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink
>>  
>> 
>> 
>> Thanks, vino.
>> 
>> Addison Higham mailto:addis...@gmail.com>> 
>> 于2018年8月25日周六 上午12:46写道:
>> HI,
>> 
>> I am writing a parallel source function that ideally needs to receive some 
>> messages as control information (specifically, a state message on where to 
>> start reading from a kinesis stream). As far as I can tell, there isn't a 
>> way to make a sourceFunction receive input (which makes sense) so I am 
>> thinking it makes sense to use a processFunction that will occasionally 
>> receive control messages and mostly just output a lot of messages.
>> 
>> This works from an API perspective, with a few different options, I could 
>> either:
>> 
>> A) have the processElement function block on calling the loop that will 
>> produce messages or
>> B) have the processEelement function return (by pass the collector and 
>> starting the reading on a different thread), but continue to produce 
>> messages downstream
>> 
>> This obviously does raise some concerns though:
>> 
>> 1. Does this break any assumptions flink has of message lifecycle? Option A 
>> of blocking on processElement for very long periods seems straight forward 
>> but less than ideal, not to mention not being able to handle any other 
>> control messages.
>> 
>> However, I am not sure if a processFunction sending messages after the 
>> processElement function has returned would break some expectations flink has 
>> of operator lifeycles. Messages are also emitted by timers, etc, but this 
>> would be completely outside any of those function calls as it is started on 
>> another thread. This is obviously how most SourceFunctions work, but it 
>> isn't clear if the same technique holds for ProcessFunctions
>> 
>> 2. Would this have a negative impact on backpressure downstream? Since I am 
>> still going to be using the same collector instance, it seems like it should 
>> ultimately work, but I wonder if there are other details I am not aware of.
>> 
>> 3. Is this just a terrible idea in general? It seems like I could maybe do 
>> this by implementing directly on top of an Operator, but I am not as 
>> familiar with that API
>> 
>> Thanks in advance for any thoughts!
>> 
>> Addison
> 



Re: withFormat(Csv) is undefined for the type BatchTableEnvironment

2018-08-30 Thread Timo Walther

Hi François,

you should read the documentation from top to bottom. The overview part 
[1] explains how everything plays together with examples.


Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#overview


Am 30.08.18 um 10:41 schrieb Till Rohrmann:

Hi François,

as Vino said, the BatchTableEnvironment does not provide a 
`withFormat` method. Admittedly, the documentation does not state it 
too explicitly but you can only call the `withFormat` method on a 
table connector as indicated here [1]. If you think that you need to 
get the data from somewhere first before defining a format, then it 
becomes clear that you first need to define a connector.


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/connect.html#table-formats


Cheers,
Till

On Thu, Aug 30, 2018 at 4:46 AM vino yang > wrote:


Hi francois,

Maybe you can refer to the comments of this source code?[1]


https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala#L143

Thanks, vino.

françois lacombe mailto:francois.laco...@dcbrain.com>> 于2018年8月29日周三
下午10:54写道:

Hi Vino,

Thanks for this answer.
I can't find in the docs where it's about BatchTableDescriptor

https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/connect.html#csv-format

It sounds like the withFormat method is applied on
TableEnvironment object on this page.

All the best

François

2018-08-28 4:37 GMT+02:00 vino yang mailto:yanghua1...@gmail.com>>:

Hi Francois,

Yes, the withFormat API comes from an instance of
BatchTableDescriptor, and the BatchTableDescriptor
instance is returned by the connect API, so you should
call BatchTableEnvironment#connect first.

Thanks, vino.

françois lacombe mailto:francois.laco...@dcbrain.com>> 于2018年8月27日周一
下午10:26写道:

Hi all,

I'm currently trying to load a CSV file content with
Flink 1.6.0 table API.
This error is raised as a try to execute the code
written in docs

https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/connect.html#csv-format

ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv =
TableEnvironment.getTableEnvironment(env);
tEnv.withFormat(new Csv(...));

> Exception in thread "main" java.lang.Error:
Unresolved compilation problem:
   The method withFormat(Csv) is undefined for the
type BatchTableEnvironment

Am I wrong?

Thanks in advance for any hint

François






Re: History Server in Kubernetes

2018-08-30 Thread Till Rohrmann
Hi Encho,

currently, the existing image does not support to start a HistoryServer.
The reason is simply that it has not been exposed because the image
contains everything needed. In order to do this, you would need to extend
the docker-entrypoint.sh script with an additional history-server option.
It could look the following:

```
if [ "${CMD}" == "${TASK_MANAGER}" ]; then
exec $FLINK_HOME/bin/taskmanager.sh start-foreground "$@"
elif [ "${CMD}" == "history-server" ]; then
exec $FLINK_HOME/bin/historyserver.sh start-foreground "$@"
else
exec $FLINK_HOME/bin/standalone-job.sh start-foreground "$@"
fi
```

Do you want to create an JIRA issue for that and contribute it?

Cheers,
Till

On Thu, Aug 30, 2018 at 9:04 AM Encho Mishinev 
wrote:

> Hello,
>
> I am struggling to find how to run a history server in Kubernetes. The
> docker image takes an argument that starts a jobmanager or a taskmanager,
> but no history server. What's the best way to set up one in K8S?
>
> Thanks,
> Encho
>


Re: Job goes down after 1/145 TMs is lost (NoResourceAvailableException)

2018-08-30 Thread Till Rohrmann
Hi Subramanya,

in order to help you I need a little bit of context. Which version of Flink
are you running? The configuration yarn.reallocate-failed is deprecated
since version Flink 1.1 and does not have an effect anymore.

What would be helpful is to get the full jobmanager log from you. If the
YarnFlinkResourceManager gets notified that a container has failed, it
should restart this container (it will do this 145 times). So if the
YarnFlinkResourceManager does not get notified about a completed container,
then this might indicate that the container is still running. So it would
be good to check what the logs of container_e27_1535135887442_0906_01_39
say.

Moreover, do you see the same problem occur when using the latest release
Flink 1.6.0?

Cheers,
Till

On Thu, Aug 30, 2018 at 8:48 AM Subramanya Suresh 
wrote:

> Hi, we are seeing a weird issue where one TaskManager is lost and then
> never re-allocated and subsequently operators fail with
> NoResourceAvailableException and after 5 restarts (we have FixedDelay
> restarts of 5) the application goes down.
>
>- We have explicitly set *yarn.reallocate-failed: *true and have not
>specified the yarn.maximum-failed-containers (and see
>“org.apache.flink.yarn.YarnApplicationMasterRunner - YARN
>application tolerates 145 failed TaskManager containers before giving up”
>in the logs).
>- After the initial startup where all 145 TaskManagers are requested I
>never see any logs saying “Requesting new TaskManager container” to
>reallocate failed container.
>
>
> 2018-08-29 23:13:56,655 WARN  akka.remote.RemoteWatcher
>   - Detected unreachable: [akka.tcp://
> fl...@blahabc.sfdc.net:123]
>
> 2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager
>   - Task manager akka.tcp://
> fl...@blahabc.sfdc.net:123/user/taskmanager terminated.
>
> java.lang.Exception: TaskManager was lost/killed:
> container_e27_1535135887442_0906_01_39 @ blahabc.sfdc.net
> (dataPort=124)
>
> at
> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
>
> at
> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)
>
> at
> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
>
> at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
>
> at
> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
>
> at org.apache.flink.runtime.jobmanager.JobManager.org
> $apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)
>
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)
>
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>
> at
> org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)
>
> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>
> at
> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
>
> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>
> at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>
> at
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>
> at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>
> at
> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
>
> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
>
> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:494)
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> java.lang.Exception: TaskManager was lost/killed:
> container_e27_1535135887442_0906_01_39 @ blahabc.sfdc.net:42414
> (dataPort=124)
>
> at
> 

Re: withFormat(Csv) is undefined for the type BatchTableEnvironment

2018-08-30 Thread Till Rohrmann
Hi François,

as Vino said, the BatchTableEnvironment does not provide a `withFormat`
method. Admittedly, the documentation does not state it too explicitly but
you can only call the `withFormat` method on a table connector as indicated
here [1]. If you think that you need to get the data from somewhere first
before defining a format, then it becomes clear that you first need to
define a connector.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/connect.html#table-formats

Cheers,
Till

On Thu, Aug 30, 2018 at 4:46 AM vino yang  wrote:

> Hi francois,
>
> Maybe you can refer to the comments of this source code?[1]
>
>
> https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala#L143
>
> Thanks, vino.
>
> françois lacombe  于2018年8月29日周三 下午10:54写道:
>
>> Hi Vino,
>>
>> Thanks for this answer.
>> I can't find in the docs where it's about BatchTableDescriptor
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/connect.html#csv-format
>>
>> It sounds like the withFormat method is applied on TableEnvironment
>> object on this page.
>>
>> All the best
>>
>> François
>>
>> 2018-08-28 4:37 GMT+02:00 vino yang :
>>
>>> Hi Francois,
>>>
>>> Yes, the withFormat API comes from an instance of BatchTableDescriptor,
>>> and the BatchTableDescriptor instance is returned by the connect API, so
>>> you should call BatchTableEnvironment#connect first.
>>>
>>> Thanks, vino.
>>>
>>> françois lacombe  于2018年8月27日周一 下午10:26写道:
>>>
 Hi all,

 I'm currently trying to load a CSV file content with Flink 1.6.0 table
 API.
 This error is raised as a try to execute the code written in docs

 https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/connect.html#csv-format

 ExecutionEnvironment env =
 ExecutionEnvironment.getExecutionEnvironment();
 BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
 tEnv.withFormat(new Csv(...));

 > Exception in thread "main" java.lang.Error: Unresolved compilation
 problem:
The method withFormat(Csv) is undefined for the type
 BatchTableEnvironment

 Am I wrong?

 Thanks in advance for any hint

 François

>>>
>>


Re: checkpoint timeout

2018-08-30 Thread Till Rohrmann
Hi John,

which version of Flink are you using. I just tried it out with the current
snapshot version and I could configure the checkpoint timeout via

CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointTimeout(1337L);

Could you provide us the logs and the application code you are running?

Cheers,
Till

On Thu, Aug 30, 2018 at 4:23 AM vino yang  wrote:

> Hi John,
>
> Setting the checkpoint timeout is through this API. The default timeout
> for checkpoints is 10 minutes [1], not one minute. So, I think it must be
> something else.
> You can set the log level of JM and TM to Debug, and then see more
> checkpoint details. If there is no way to analyze it, you can share your
> log on this mailing list, if you can.
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/environment/CheckpointConfig.html
>
> Thanks, vino.
>
> John O  于2018年8月30日周四 上午2:37写道:
>
>> I have a flink job with a big enough state that makes checkpointing long
>> ( ~ 70 seconds).
>>
>> I have configured the checkpoint timeout to 180 seconds
>> (setCheckpointTimeout(18))
>>
>> But as you can see from the following logs, timeout seems to be ~60
>> seconds.
>>
>> Is there another timeout configuration I need to set?
>>
>> 2018-08-29 11:41:03,883 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>> checkpoint 61 @ 1535542863734 for job aae565f4f6efba50d5252fc1afd7c255.
>>
>> 2018-08-29 11:42:03,883 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint
>> 61 of job aae565f4f6efba50d5252fc1afd7c255 expired before completing.
>>
>> 2018-08-29 11:42:13,955 WARN
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received
>> late message for now expired checkpoint attempt 61 from
>> f022ce60f0f5da2a34290574305813d8 of job aae565f4f6efba50d5252fc1afd7c255.
>>
>>
>>
>>
>>
>> Jo
>>
>


Re: Problem with querying state on Flink 1.6.

2018-08-30 Thread Till Rohrmann
Hi Joe,

it looks as if the queryable state server binds to the local loopback
address. This looks like a bug to me. Could you maybe share the complete
cluster entrypoint and the task manager logs with me?

In the meantime you could try to do the following: Change
AbstractServerBase.java:227 into `.localAddress(port)`. This should bind to
any local address. Now you need to build your own Flink distribution by
running `mvn clean package -DskipTests` and then go to either build-target
or flink-dist/target/flink-1.7-SNAPSHOT-bin/flink-1.7-SNAPSHOT to find the
distribution.

Cheers,
Till

On Thu, Aug 30, 2018 at 12:12 AM Joe Olson  wrote:

> I'm having a problem with querying state on Flink 1.6.
>
> I put a project in Github that is my best representation of the very
> simple client example outlined in the 'querying state' section of the 1.6
> documentation at
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html
> . The Github project is at https://github.com/jolson787/qs
>
> My problem: I know the query server and proxy server have started on my 1
> job manager / 1 task manager Flink 1.6 test rig, because I see the 'Started
> Queryable State Server' and 'Started Queryable State Proxy Server' in the
> task manager logs. I know the ports are open on the local machine, because
> I can telnet to them.
>
> From a remote machine, I implemented the QueryableStateClient as in the
> example, and made a getKVState call. Nothing I seem to do between that or
> the getKVstate call seems to register...no response, no errors thrown, no
> lines in the log, no returned futures, no timeouts, etc. I know the proxy
> server and state server ports are NOT open to the remote machine, yet the
> client still doesn't seem to react.
>
> Can someone take a quick look at my very simple Github project and see if
> anything jumps out at them? Beer is on me at Flink Forward if someone can
> help me work through this
>


History Server in Kubernetes

2018-08-30 Thread Encho Mishinev
Hello,

I am struggling to find how to run a history server in Kubernetes. The
docker image takes an argument that starts a jobmanager or a taskmanager,
but no history server. What's the best way to set up one in K8S?

Thanks,
Encho


Job goes down after 1/145 TMs is lost (NoResourceAvailableException)

2018-08-30 Thread Subramanya Suresh
Hi, we are seeing a weird issue where one TaskManager is lost and then
never re-allocated and subsequently operators fail with
NoResourceAvailableException and after 5 restarts (we have FixedDelay
restarts of 5) the application goes down.

   - We have explicitly set *yarn.reallocate-failed: *true and have not
   specified the yarn.maximum-failed-containers (and see
   “org.apache.flink.yarn.YarnApplicationMasterRunner - YARN
   application tolerates 145 failed TaskManager containers before giving up”
   in the logs).
   - After the initial startup where all 145 TaskManagers are requested I
   never see any logs saying “Requesting new TaskManager container” to
   reallocate failed container.


2018-08-29 23:13:56,655 WARN  akka.remote.RemoteWatcher
- Detected unreachable: [akka.tcp://
fl...@blahabc.sfdc.net:123]

2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager
- Task manager akka.tcp://
fl...@blahabc.sfdc.net:123/user/taskmanager terminated.

java.lang.Exception: TaskManager was lost/killed:
container_e27_1535135887442_0906_01_39 @ blahabc.sfdc.net (dataPort=124)

at
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)

at
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)

at
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)

at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)

at
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)

at org.apache.flink.runtime.jobmanager.JobManager.org
$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)

at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)

at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at
org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)

at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)

at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

at
akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)

at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)

at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)

at akka.actor.ActorCell.invoke(ActorCell.scala:494)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

at akka.dispatch.Mailbox.run(Mailbox.scala:224)

at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

java.lang.Exception: TaskManager was lost/killed:
container_e27_1535135887442_0906_01_39 @ blahabc.sfdc.net:42414
(dataPort=124)

at
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)

at
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)

at
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)

at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)

at
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)

at org.apache.flink.runtime.jobmanager.JobManager.org
$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)

at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)

at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at
org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at