Flink(1.5.0) SQL stream non-windowed inner join

2018-06-13 Thread 徐林彬
Hi all
I'm trying to prove flink sql stream non-windowed inner join with flink 1.5.0, 
but it failed. 
Then i tried flink test 
case(flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinItCase.testNonWindowInnerJoin)
 with java instead of scala, but it failed too.
The problem is that program throws TableException which says cannot generate a 
valid execution plan for the given query, this exception indicates that the 
query uses an unsupported SQL feature.
Is that a bug?How to solve this problem?


Thanks


Emerson




 

Blobstore exceptions.

2018-06-13 Thread Lasse Nedergaard
Hi.

We sometimes see job fails with a blob store exception, like the one below.
Anyone has an idea why we get them, and how to avoid them?.
In this case the job has run without any problems for a week and then we
get the error. Only this job are affected right now all other running as
expected and next time it can be one of the other jobs that get the
exception.

We running Flink 1.4.2, on AWS EMR cluster, but we have seen the same
problems on 1.3.2 too.

Anyone

java.io.IOException: Failed to fetch BLOB
ff5d324719fb4caf3a0dba3fbcfa795e/p-812d84ea013302dbd24da1d32e732cc01582dabc-3198b6f63d293d2756f4cf5b8eebe7a2
from ip-10-1-1-192.eu-west-1.compute.internal/10.1.1.192:46781 and
store it under 
/tmp/blobStore-3e90d7b0-2f40-4e28-b2b0-01d9ba96ac55/incoming/temp-0173
at 
org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:191)
at 
org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:177)
at 
org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:205)
at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:119)
at 
org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:878)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:589)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: GET operation failed: Server side
error: 
/tmp/blobStore-a83b8ca6-c01a-496a-8997-31687f37b95d/incoming/temp-00049050
at 
org.apache.flink.runtime.blob.BlobClient.getInternal(BlobClient.java:253)
at 
org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:166)
... 6 more
Caused by: java.io.IOException: Server side error:
/tmp/blobStore-a83b8ca6-c01a-496a-8997-31687f37b95d/incoming/temp-00049050
at 
org.apache.flink.runtime.blob.BlobClient.receiveAndCheckGetResponse(BlobClient.java:306)
at 
org.apache.flink.runtime.blob.BlobClient.getInternal(BlobClient.java:247)
... 7 more
Caused by: java.nio.file.NoSuchFileException:
/tmp/blobStore-a83b8ca6-c01a-496a-8997-31687f37b95d/incoming/temp-00049050
at 
sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:409)
at 
sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)
at java.nio.file.Files.move(Files.java:1395)
at 
org.apache.flink.runtime.blob.BlobUtils.moveTempFileToStore(BlobUtils.java:452)
at 
org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer.java:521)
at 
org.apache.flink.runtime.blob.BlobServerConnection.get(BlobServerConnection.java:231)
at 
org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:117)


Re: Exception while submitting jobs through Yarn

2018-06-13 Thread Garvit Sharma
Can someone please tell why am I facing this?

On Wed, Jun 13, 2018 at 10:33 PM Garvit Sharma  wrote:

> Hi,
>
> I am using *flink-1.5.0-bin-hadoop27-scala_2.11 *to submit jobs through
> Yarn, but I am getting the below exception :
>
> java.lang.NoClassDefFoundError:
> com/sun/jersey/core/util/FeaturesAndProperties
>
> at java.lang.ClassLoader.defineClass1(Native Method)
>
> at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
>
> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
>
> at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
> at
> org.apache.hadoop.yarn.client.api.TimelineClient.createTimelineClient(TimelineClient.java:55)
>
> at
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.createTimelineClient(YarnClientImpl.java:181)
>
> at
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:168)
>
> at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
>
> at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.getClusterDescriptor(FlinkYarnSessionCli.java:966)
>
> at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.createDescriptor(FlinkYarnSessionCli.java:269)
>
> at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.createClusterDescriptor(FlinkYarnSessionCli.java:444)
>
> at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.createClusterDescriptor(FlinkYarnSessionCli.java:92)
>
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:221)
>
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
>
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)
>
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at javax.security.auth.Subject.doAs(Subject.java:422)
>
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
>
> Caused by: java.lang.ClassNotFoundException:
> com.sun.jersey.core.util.FeaturesAndProperties
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
>
> Command : HADOOP_CONF_DIR=/etc/hadoop/conf bin/flink run -m yarn-cluster
> -yd -yn 2 -ys 20 -yjm 10240 -ytm 10240 -yst -ynm test -yqu default -p 20
> test.jar
>
> The class *com/sun/jersey/core/util/FeaturesAndProperties* is already
> present in the test.jar so not sure why am I getting this exception.
>
> Please check and let me know.
>
> Thanks,
> --
>
> Garvit Sharma
> github.com/garvitlnmiit/
>
> No Body is a Scholar by birth, its only hard work and strong determination
> that makes him master.
>


-- 

Garvit Sharma
github.com/garvitlnmiit/

No Body is a Scholar by birth, its only hard work and strong determination
that makes him master.


Re:Checkpoint/ Savepoint usage

2018-06-13 Thread sihua zhou
Hi Rinat,


> are my assumptions about checkpoint/ savepoint state usage correct ?


Indeed, a bit incorrect, you can also restore the job from a checkpoint. By 
default, the checkpoint data will be removed if the job finish(maybe canceled 
by user), but you can configure flink to retain the checkpoint when the job is 
finish. That way, if you cancel the job, the checkpoint will be retained, and 
you could restore your job from it later. You could refer to 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/checkpoints.html#retained-checkpoints
 to find more information.


> when I’m creating a savepoint, only hdfs could be used as a backend?


No, once flink's all components(TMs and JM) are accessiable to the target 
storage, you could creating a savepoint to there. For example, s3 or even 
"local file system"(if your TMs and JM are on a single machine). 


> when I’m using RocksDB, it could only be used as a checkpointing backend, and 
> when I’ll decide to create savepoint, it’ll be stored in hdfs ?


I think there maybe a little misunderstand here, RocksDB backend is used for 
storing keyed state, this is the same as the Heap backend, it doesn't determine 
the checkpoint backend at all.


> do we have any ability to configure the job, to use last checkpoint as a 
> starting state out of the box ?


No, currently that is unsupported, I think you hited an issue that are 
currently under disscussion, it's a bit tricky to do that for some reason. You 
could refer to https://issues.apache.org/jira/browse/FLINK-9043 to get more 
information.


Best, Sihua


On 06/14/2018 03:54,Rinat wrote:
Hi mates, on my way of using BucketingSink, I've decided to enable 
checkpointing, to prevent hanging of files in open state on job failure.
But it seems, that I’m not properly understood the meaning of checkpointing …


I’ve enabled the fs backend for checkpoints, and while job is working 
everything works fine, file with the state is created, and if I kill the 
taskmanager, it will be restored.
But in case, when I kill the whole job, and run it again, the state from last 
checkpoint won’t be used, and one more new state is created.


If I properly understood, checkpointing state is used by job manager, while job 
is running, and if I would like to cancel/ kill the job, I should use 
savepoints.


So I got the following questions:


are my assumptions about checkpoint/ savepoint state usage correct ?
when I’m creating a savepoint, only hdfs could be used as a backend ?
when I’m using RocksDB, it could only be used as a checkpointing backend, and 
when I’ll decide to create savepoint, it’ll be stored in hdfs ?
do we have any ability to configure the job, to use last checkpoint as a 
starting state out of the box ?


Sincerely yours,

Rinat Sharipov
Software Engineer at 1DMP CORE Team


email: r.shari...@cleverdata.ru
mobile: +7 (925) 416-37-26


CleverDATA
make your data clever



Re: IoT Use Case, Problem and Thoughts

2018-06-13 Thread Ashish Pokharel
Hi Fabian,

Thanks for the prompt response and apologies for delayed response. 

You wrapped up the bottom lines pretty well - if I were to wrap it up I’d say 
“best possible” recovery on “known" restarts either say manual cancel + start 
OR framework initiated ones like on operator failures with these constraints 
 - some data loss is ok
 - avoid periodic checkpoints as states are really transient (less than 5 
seconds of lifetime if not milliseconds) and almost all events make it to 
state. I do understand that checkpointing performance has drastically been 
improved and with async and RocksDB options, it should technically not add 
latency in application etc. However, I feel like even with improvements and 
local checkpointing (which we already are doing) it is a lot of “unused” 
IOPS/resource utilization especially if we start to spin up more apps handling 
similar data sources and with similar requirements. On a first blush it feels 
like those resources are better utilized in cluster for apps with stricter SLAs 
for data loss and recovery etc instead.

Basically, I suppose I am thinking Checkpointing feature that is initialized by 
certain actions / events rather than periodic ones. Let me know I am off-base 
here and I should just enable checkpointing in all of these apps and move on :) 

I tried Savepoint again and it looks like the issue is caused by the fact that 
Memory states are large as it is throwing error states are larger than certain 
size. So solution of (1) will possibly solve (2) as well. 

Thanks again,

Ashish


> On Jun 7, 2018, at 4:25 PM, Fabian Hueske  wrote:
> 
> Hi Ashish,
> 
> Thanks for the great write up. 
> If I understood you correctly, there are two different issues that are caused 
> by the disabled checkpointing.
> 
> 1) Recovery from a failure without restarting all operators to preserve the 
> state in the running tasks
> 2) Planned restarts an application without losing all state (even with 
> disabled checkpointing).
> 
> Ad 1) The community is constantly working on reducing the time for 
> checkpointing and recovery. 
> For 1.5, local task recovery was added, which basically stores a state copy 
> on the local disk which is read in case of a recovery. So, tasks are 
> restarted but don't read the to restore state from distributed storage but 
> from the local disk.
> AFAIK, this can only be used together with remote checkpoints. I think this 
> might be an interesting option for you if it would be possible to write 
> checkpoints only to local disk and not remote storage. AFAIK, there are also 
> other efforts to reduce the number of restarted tasks in case of a failure. I 
> guess, you've played with other features such as RocksDBStateBackend, 
> incremental and async checkpoints already. 
> 
> Ad 2) It sounds as if savepoints are exactly the feature your are looking 
> for. It would be good to know what exactly did not work for you. The 
> MemoryStateBackend is not suitable for large state sizes because it backups 
> into the heap memory of the JobManager. 
> 
> Best, Fabian
> 
> 2018-06-05 21:57 GMT+02:00 ashish pok  >:
> Fabian, Stephan, All,
> 
> I started a discussion a while back around having a form of event-based 
> checkpointing policy that will help us in some of our high volume data 
> pipelines. Here is an effort to put this in front of community and understand 
> what capabilities can support these type of use cases, how much others feel 
> the same need and potentially a feature that can make it to a user story.
> 
> Use Case Summary:
> - Extremely high volume of data (events from consumer devices with customer 
> base of over 100M)
> - Multiple events need to be combined using a windowing streaming app grouped 
> by keys (something like 5 min floor of timestamp and unique identifiers for 
> customer devices)
> - "Most" events by a group/key arrive in few seconds if not milliseconds 
> however events can sometimes delay or get lost in transport (so delayed event 
> handling and timeouts will be needed)
> - Extremely low (pretty vague but hopefully details below clarify it more) 
> data loss is acceptable
> - Because of the volume and transient nature of source, checkpointing is 
> turned off (saves on writes to persistence as states/sessions are active for 
> only few seconds during processing)
> 
> Problem Summary:
> Of course, none of the above is out of the norm for Flink and as a matter of 
> factor we already have a Flink app doing this. The issue arises when it comes 
> to graceful shutdowns and on operator failures (eg: Kafka timeouts etc.) On 
> operator failures, entire job graph restarts which essentially flushes out 
> in-memory states/sessions. I think there is a feature in works (not sure if 
> it made it to 1.5) to perform selective restarts which will control the 
> damage but still will result in data loss. Also, it doesn't help when 
> application restarts are needed. We did try going savepoint route for 

Re: What does the number in front of ">" mean when I print a DataStream

2018-06-13 Thread Hequn Cheng
Hi, chris

It means there are four threads and each thread outputs a record. You can
use env.setParallelism() to change the default value(i.e., 4) to other
values.

Best, Hequn

On Thu, Jun 14, 2018 at 9:09 AM, chrisr123  wrote:

>
> What does the number in front of the ">" character  mean when call print()
> on a dataset?
>
> For example I may have this in my source where I am reading a socket stream
> of sensor data:
>
> DataStream> simpleStream = env
> .socketTextStream(parms.get("host"),
> parms.getInt("port"))
> .map(new SensorParser());
>
> DataStream> temps = simpleStream
> .keyBy(0)
> .window(TumblingProcessingTimeWindows.
> of(Time.seconds(10)))
> .process(new MyProcessWindowFunction(
> appStartTime));
> temps.print();
>
> When I look at the output, I see something like:
>
> 1> (sensor5,98.64)
> 3> (sensor8,81.31)
> 2> (sensor3,69.55)
> 4> (sensor0,82.86)
>
> What do numbers 1,3,2,4 mean, i.e. 1>, 3>, 2>, 4> on the lines above?
> Thanks
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


What does the number in front of ">" mean when I print a DataStream

2018-06-13 Thread chrisr123


What does the number in front of the ">" character  mean when call print()
on a dataset?

For example I may have this in my source where I am reading a socket stream
of sensor data:

DataStream> simpleStream = env
.socketTextStream(parms.get("host"), 
parms.getInt("port"))
.map(new SensorParser());

DataStream> temps = simpleStream
.keyBy(0)

.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.process(new 
MyProcessWindowFunction(appStartTime));
temps.print();

When I look at the output, I see something like:

1> (sensor5,98.64)
3> (sensor8,81.31)
2> (sensor3,69.55)
4> (sensor0,82.86)

What do numbers 1,3,2,4 mean, i.e. 1>, 3>, 2>, 4> on the lines above?
Thanks





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Metric name collision

2018-06-13 Thread rharlin
Right after I sent this, I realized that FLINK-7502 is likely the fix that
I'm looking for.  I swapped in a more recent version of the
flink-metrics-prometheus jar and it seems to be much happier now.

Thanks,

Russell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Windows support

2018-06-13 Thread Chesnay Schepler

Which scripts did you try?

I use Windows 10 as well and can run the .bat scripts with powershell 
and the .sh scripts in WSL just fine.


We did rework the windows scripts in 1.5, the primary change being 
separate processes for Job- and TaskManager .


On 13.06.2018 21:04, TechnoMage wrote:

Has any work been done on support for Windows in 1.5?  I tried the scripts in 
1.4 with windows 10 with no luck.

Michael






Metric name collision

2018-06-13 Thread Russell Harlin
Hi,

I'm trying to add some custom metrics for a Flink job, but have bumped into
some issues using the PrometheusReporter.  If I'm running multiple
instances of the same job under the same TaskManager, I'm seeing the
following error when the second instance of the job tries to create the
metric with the same name:

2018-06-13 11:17:42,512 ERROR
org.apache.flink.runtime.metrics.MetricRegistry   - Error while
registering metric.
java.lang.IllegalArgumentException: Collector already registered that
provides name: flink_taskmanager_job_task_operator_myMetric

This is preventing the metric from being created properly.  I can work
around this by putting the task_attempt_id or some other uuid in the metric
name to avoid the collision, but this causes extra clutter and orphaned
metrics if the job restarts.  Has anyone else run into this?  Is there a
better approach for handling it?

Thanks,

Russell


Re: compile and package connectors and examples

2018-06-13 Thread Chesnay Schepler

IV'e already responded to you previous mail asking the same question.

On 13.06.2018 19:06, Chris Kellogg wrote:

How can one build a connectors jar from the source?

Also, is there a quick way to build the examples from the source 
without having to do a mvn clean package -DskipTests?



Thanks.
Chris





Re: [BucketingSink] notify on moving into pending/ final state

2018-06-13 Thread Rinat
Hi guys, thx for your reply.

The following code info is actual for release-1.5.0 tag, 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class

For now, BucketingSink has the following lifecycle of files

When moving files from opened to pending state:
on each item (method invoke:434 line), we check that suitable bucket exist, and 
contain opened file, in case, when opened file doesn’t exist, we create one, 
and write item to it
on each item (method invoke:434 line), we check that suitable opened file 
doesn’t exceed the limits, and if limits are exceeded, we close it and move 
into pending state using closeCurrentPartFile:568 line - private method
on each timer request (onProcessingTime:482 line), we check, if items haven't 
been added to the opened file longer, than specified period of time, we close 
it, using the same private method closeCurrentPartFile:588 line

So, the only way, that we have, is to call our hook from closeCurrentPartFile, 
that is private, so we copy-pasted the current impl and injected our logic there


Files are moving from pending state into final, during checkpointing lifecycle, 
in notifyCheckpointComplete:657 line, that is public, and contains a lot of 
logic, including discovery of files in pending states, synchronization of state 
access and it’s modification, etc … 

So we couldn’t override it, or call super method and add some logic, because 
when current impl changes the state of files, it removes them from state, and 
we don’t have any opportunity to know, 
for which files state have been changed.

To solve such problem, we've created the following interface

/**
 * The {@code FileStateChangeCallback} is used to perform any additional 
operations, when {@link BucketingSink}
 * moves file from one state to another. For more information about state 
management of {@code BucketingSink}, look
 * through it's official documentation.
 */
public interface FileStateChangeCallback extends Serializable {

/**
 * Used to perform any additional operations, related with moving of file 
into next state.
 *
 * @param fs provides access for working with file system
 * @param path path to the file, moved into next state
 *
 * @throws IOException if something went wrong, while performing any 
operations with file system
 */
void call(FileSystem fs, Path path) throws IOException;
}
And have added an ability to register this callbacks in BucketingSink impl in 
the following manner

public BucketingSink 
registerOnFinalStateChangeCallback(FileStateChangeCallback… callbacks) {...}
public BucketingSink 
registerOnPendingStateChangeCallback(FileStateChangeCallback... callbacks) {...}

I’m ready to discuss the best ways, how such hooks could be implemented in the 
core impl or any other improvements, that will help us to add such 
functionality into our extension, using public api, instead of copy-pasting the 
source code.

Thx for your help, mates =)


> On 11 Jun 2018, at 11:37, Piotr Nowojski  wrote:
> 
> Hi,
> 
> I see that could be a useful feature. What exactly now is preventing you from 
> inheriting from BucketingSink? Maybe it would be just enough to make the 
> BucketingSink easier extendable.
> 
> One thing now that could collide with such feature is that Kostas is now 
> working on larger BucketingSink rework/refactor. 
> 
> Piotrek
> 
>> On 8 Jun 2018, at 16:38, Rinat > > wrote:
>> 
>> Hi mates, I got a proposal about functionality of BucketingSink.
>> 
>> During implementation of one of our tasks we got the following need - create 
>> a meta-file, with the path and additional information about the file, 
>> created by BucketingSink, when it’s been moved into final place.
>> Unfortunately such behaviour is currently not available for us. 
>> 
>> We’ve implemented our own Sink, that provides an opportunity to register 
>> notifiers, that will be called, when file state is changing, but current API 
>> doesn’t allow us to add such behaviour using inheritance ...
>> 
>> It seems, that such functionality could be useful, and could be a part of 
>> BucketingSink API
>> What do you sink, should I make a PR ?
>> 
>> Sincerely yours,
>> Rinat Sharipov
>> Software Engineer at 1DMP CORE Team
>> 
>> email: r.shari...@cleverdata.ru 
>> mobile: +7 (925) 416-37-26
>> 
>> CleverDATA
>> make your data clever
>> 
> 

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru 
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



Checkpoint/ Savepoint usage

2018-06-13 Thread Rinat
Hi mates, on my way of using BucketingSink, I've decided to enable 
checkpointing, to prevent hanging of files in open state on job failure.
But it seems, that I’m not properly understood the meaning of checkpointing …

I’ve enabled the fs backend for checkpoints, and while job is working 
everything works fine, file with the state is created, and if I kill the 
taskmanager, it will be restored.
But in case, when I kill the whole job, and run it again, the state from last 
checkpoint won’t be used, and one more new state is created.

If I properly understood, checkpointing state is used by job manager, while job 
is running, and if I would like to cancel/ kill the job, I should use 
savepoints.

So I got the following questions:

are my assumptions about checkpoint/ savepoint state usage correct ?
when I’m creating a savepoint, only hdfs could be used as a backend ?
when I’m using RocksDB, it could only be used as a checkpointing backend, and 
when I’ll decide to create savepoint, it’ll be stored in hdfs ?
do we have any ability to configure the job, to use last checkpoint as a 
starting state out of the box ?

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru 
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



Re: A question about Kryo and Window State

2018-06-13 Thread Vishal Santoshi
Any ideas on the standard way ( or any roundabout way ) of doing a version
upgrade that looks back ward compatible.
The  @FieldSerializer.Optional("0") actually does  ignore the field ( even
if reset ) giving it the default value if kyro is used. It has to do with
the FieldSerializer behaves  .  There is another Serializer ( Composite I
believe ) that allows for such back ward compatible changes.


I know some work is being done in 1.6 to allow for above use case and I
think Google Data Flow does provide some avenues.

Thanks much

Vishal



On Tue, Jun 12, 2018 at 11:30 PM, Vishal Santoshi  wrote:

> I have a running pipe with Window State in a class say
>
> Class A{
>  long a;
> }
>
> It uses the default KryoSerializer
>
> I want to add a field to
>
> Class A {
>   long a;
>   long b;
> }
>
> I need to suspend with SP and resume with the new version of Class A
>
>
> Is there a definite way to do this. I tried
>
> Class A {
>   long a;
>@FieldSerializer.Optional("0")
>   long b;
> }
>
> but that seems to default to 0 , even when the Aggregation is putting in
> values.
>
> Could somebody give pointers as to how to solve this
>
> Thanks a ton.
>
>
>
>


Windows support

2018-06-13 Thread TechnoMage
Has any work been done on support for Windows in 1.5?  I tried the scripts in 
1.4 with windows 10 with no luck.

Michael



compile and package connectors and examples

2018-06-13 Thread Chris Kellogg
How can one build a connectors jar from the source?

Also, is there a quick way to build the examples from the source without
having to do a mvn clean package -DskipTests?


Thanks.
Chris


Exception while submitting jobs through Yarn

2018-06-13 Thread Garvit Sharma
Hi,

I am using *flink-1.5.0-bin-hadoop27-scala_2.11 *to submit jobs through
Yarn, but I am getting the below exception :

java.lang.NoClassDefFoundError:
com/sun/jersey/core/util/FeaturesAndProperties

at java.lang.ClassLoader.defineClass1(Native Method)

at java.lang.ClassLoader.defineClass(ClassLoader.java:763)

at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)

at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)

at java.net.URLClassLoader.access$100(URLClassLoader.java:73)

at java.net.URLClassLoader$1.run(URLClassLoader.java:368)

at java.net.URLClassLoader$1.run(URLClassLoader.java:362)

at java.security.AccessController.doPrivileged(Native Method)

at java.net.URLClassLoader.findClass(URLClassLoader.java:361)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

at
org.apache.hadoop.yarn.client.api.TimelineClient.createTimelineClient(TimelineClient.java:55)

at
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.createTimelineClient(YarnClientImpl.java:181)

at
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:168)

at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)

at
org.apache.flink.yarn.cli.FlinkYarnSessionCli.getClusterDescriptor(FlinkYarnSessionCli.java:966)

at
org.apache.flink.yarn.cli.FlinkYarnSessionCli.createDescriptor(FlinkYarnSessionCli.java:269)

at
org.apache.flink.yarn.cli.FlinkYarnSessionCli.createClusterDescriptor(FlinkYarnSessionCli.java:444)

at
org.apache.flink.yarn.cli.FlinkYarnSessionCli.createClusterDescriptor(FlinkYarnSessionCli.java:92)

at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:221)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)

at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)

at
org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)

at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)

Caused by: java.lang.ClassNotFoundException:
com.sun.jersey.core.util.FeaturesAndProperties

at java.net.URLClassLoader.findClass(URLClassLoader.java:381)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)


Command : HADOOP_CONF_DIR=/etc/hadoop/conf bin/flink run -m yarn-cluster
-yd -yn 2 -ys 20 -yjm 10240 -ytm 10240 -yst -ynm test -yqu default -p 20
test.jar

The class *com/sun/jersey/core/util/FeaturesAndProperties* is already
present in the test.jar so not sure why am I getting this exception.

Please check and let me know.

Thanks,
-- 

Garvit Sharma
github.com/garvitlnmiit/

No Body is a Scholar by birth, its only hard work and strong determination
that makes him master.


Dealing with an asynchronous source (and sink) in Flink 1.5.0. Await.Result() does not complete.

2018-06-13 Thread Niels van Kaam
Hi All,

I have implemented a custom sourcefuntion on a datasource with an
asynchronous API (the API calls return Scala futures). I need to perform
calls to the asynchronous API during initialization of each individual
(parallel) source instance, and when in exacly-once mode also during
snapshotstate or inside the run loop. The polling loop itself is
synchronous.

Since I am (at least for now) not worried about performance, I just used
Await.result() to perform a blocking wait on each asynchronous call (
https://docs.scala-lang.org/overviews/core/futures.html#blocking-outside-the-future).
This worked fine so far in Flink 1.4.2, but when I upgrade to Flink 1.5 the
futures never complete (Eventually causing timeout exceptions on the
await.result call).

The issue occurs in integration tests where the Flink jobs run locally (in
a minicluster). The issue does not occur on my local machines, but does so
consistently on Travis. I therefore suspect the issue is related to the
number of cores/workers that are available. Await.result however uses the
blockingContext, which is backed by the forkjoinpool. I do not expect a few
asynchronous calls to run into any limitations there. Compiling and running
the same code with Flink 1.4.2 works fine. The issue occurs both when
performing Await.Result() inside the run loop or inside initializeState().

Am I breaking the process model when using Await.result on asynchronous api
calls within initializeState or snapshotState in a sourcefunction (or Sink
for that matter)? With Await.result I do make sure the calls are created
and awaited within a single checkpoint.

Any other suggestions where to look for the problem, or explanation why
this issue could occur when upgrading from 1.4.2 to 1.5.0?


Thank you for your help!

Cheers,
Niels


Re: flink_taskmanager_job_task_operator_records_lag_max == -Inf on Flink 1.4.2

2018-06-13 Thread Julio Biason
Just to add some more info, here is the data I have on Prometheus (with
some names redacted):

flink_taskmanager_job_task_operator_records_lag_max{host="002s",operator_id="cbc357ccb763df2852fee8c4fc7d55f2",subtask_index="0",task_attempt_id="fa104111e1f493bbec6f4b2ce44ec1da",task_attempt_num="11",task_id="cbc357ccb763df2852fee8c4fc7d55f2",tm_id="0496ee3dc28c7cad5a512b4aefce67fa"}
83529
flink_taskmanager_job_task_operator_records_lag_max{host="002s",operator_id="cbc357ccb763df2852fee8c4fc7d55f2",subtask_index="3",task_attempt_id="4c5d8395882fb1ad26bdd6fd7f4e789c",task_attempt_num="11",task_id="cbc357ccb763df2852fee8c4fc7d55f2",tm_id="0496ee3dc28c7cad5a512b4aefce67fa"}
-Inf
flink_taskmanager_job_task_operator_records_lag_max{host="002s",operator_id="cbc357ccb763df2852fee8c4fc7d55f2",subtask_index="6",task_attempt_id="f05e3171c446c19c7b928eeffe0fa52f",task_attempt_num="11",task_id="cbc357ccb763df2852fee8c4fc7d55f2",tm_id="0496ee3dc28c7cad5a512b4aefce67fa"}
-Inf
flink_taskmanager_job_task_operator_records_lag_max{host="002s",operator_id="cbc357ccb763df2852fee8c4fc7d55f2",subtask_index="9",task_attempt_id="9533c5fa9fafadf4878ce90a08f83213",task_attempt_num="11",task_id="cbc357ccb763df2852fee8c4fc7d55f2",tm_id="0496ee3dc28c7cad5a512b4aefce67fa"}
84096
flink_taskmanager_job_task_operator_records_lag_max{host="003s",operator_id="cbc357ccb763df2852fee8c4fc7d55f2",subtask_index="1",task_attempt_id="7ea45523850f5d08bab719418321e410",task_attempt_num="11",task_id="cbc357ccb763df2852fee8c4fc7d55f2",tm_id="8f68b231855c663ae6b3c2362d39568a"}
83867
flink_taskmanager_job_task_operator_records_lag_max{host="003s",operator_id="cbc357ccb763df2852fee8c4fc7d55f2",subtask_index="10",task_attempt_id="cf6c2349ccf818f6870fdf0296be121b",task_attempt_num="11",task_id="cbc357ccb763df2852fee8c4fc7d55f2",tm_id="8f68b231855c663ae6b3c2362d39568a"}
83829
flink_taskmanager_job_task_operator_records_lag_max{host="003s",operator_id="cbc357ccb763df2852fee8c4fc7d55f2",subtask_index="4",task_attempt_id="b39c5366b90d74e57d058b64e9e08e56",task_attempt_num="11",task_id="cbc357ccb763df2852fee8c4fc7d55f2",tm_id="8f68b231855c663ae6b3c2362d39568a"}
-Inf
flink_taskmanager_job_task_operator_records_lag_max{host="003s",operator_id="cbc357ccb763df2852fee8c4fc7d55f2",subtask_index="7",task_attempt_id="db563e7e360227585cff9fa3d0035b0d",task_attempt_num="11",task_id="cbc357ccb763df2852fee8c4fc7d55f2",tm_id="8f68b231855c663ae6b3c2362d39568a"}
-Inf
flink_taskmanager_job_task_operator_records_lag_max{host="004s",operator_id="cbc357ccb763df2852fee8c4fc7d55f2",subtask_index="11",task_attempt_id="4e9231f9187b0dffc728d8cd77cfef9e",task_attempt_num="11",task_id="cbc357ccb763df2852fee8c4fc7d55f2",tm_id="9098e39a467aa6c255dcf2ec44544cb2"}
83730
flink_taskmanager_job_task_operator_records_lag_max{host="004s",operator_id="cbc357ccb763df2852fee8c4fc7d55f2",subtask_index="2",task_attempt_id="e920a1672b0a31c2d186e3f6fee38bed",task_attempt_num="11",task_id="cbc357ccb763df2852fee8c4fc7d55f2",tm_id="9098e39a467aa6c255dcf2ec44544cb2"}
-Inf
flink_taskmanager_job_task_operator_records_lag_max{host="004s",operator_id="cbc357ccb763df2852fee8c4fc7d55f2",subtask_index="5",task_attempt_id="0e22fd213905d4da222e3651e7007106",task_attempt_num="11",task_id="cbc357ccb763df2852fee8c4fc7d55f2",tm_id="9098e39a467aa6c255dcf2ec44544cb2"}
83472
flink_taskmanager_job_task_operator_records_lag_max{host="004s",operator_id="cbc357ccb763df2852fee8c4fc7d55f2",subtask_index="8",task_attempt_id="f36fe63b0688a821f5abf685551c47fa",task_attempt_num="11",task_id="cbc357ccb763df2852fee8c4fc7d55f2",tm_id="9098e39a467aa6c255dcf2ec44544cb2"}
83842

What we have are 3 servers running with 4 slots each. Our Kafka have 12
partitions and the Job is running with a parallelism of 12. In this set up,
I'd expect that each slot would grab one partition and process it, giving
me each a different lag respective to each partition. But it seems some
slots are stubbornly refusing to either grab a partition or update their
status. It doesn't seem (from a perspective of someone that doesn't know
the code) that it's not related to TaskManagers using the same Kafka
connection, as 004 is consuming 3 partitions while 002 and 003 are
consuming just 2.

And I was wrong about the attempt_id: It's not what's messing with my
Prometheus query, it's some slots reporting -Inf on their partitions.

On Wed, Jun 13, 2018 at 9:05 AM, Julio Biason 
wrote:

> Hi Gordon,
>
> We have Kafka 0.10.1.1 running and use the flink-connector-kafka-0.10
> driver.
>
> There are a bunch of flink_taskmanager_job_task_operator_* metrics,
> including some about the committed offset for each partition. It seems I
> have 4 different records_lag_max with different attempt_id, though, 3 with
> -Inf and 1 with a value -- which will give me some more understand of
> Prometheus to extract this properly.
>
> I was also checking our Grafana and the metric we were using was
> "flink_taskmanager_job_task_operator_KafkaConsumer_records_lag_max",
> actually. "flink_taskmana

Re: flink_taskmanager_job_task_operator_records_lag_max == -Inf on Flink 1.4.2

2018-06-13 Thread Julio Biason
Hi Gordon,

We have Kafka 0.10.1.1 running and use the flink-connector-kafka-0.10
driver.

There are a bunch of flink_taskmanager_job_task_operator_* metrics,
including some about the committed offset for each partition. It seems I
have 4 different records_lag_max with different attempt_id, though, 3 with
-Inf and 1 with a value -- which will give me some more understand of
Prometheus to extract this properly.

I was also checking our Grafana and the metric we were using was
"flink_taskmanager_job_task_operator_KafkaConsumer_records_lag_max",
actually. "flink_taskmanager_job_task_operator_records_lag_max" seems to be
new (with the attempt thingy).

On the "KafkaConsumer" front, but it only has the "commited_offset" for
each partition.

On Wed, Jun 13, 2018 at 5:41 AM, Tzu-Li (Gordon) Tai 
wrote:

> Hi,
>
> Which Kafka version are you using?
>
> AFAIK, the only recent changes to Kafka connector metrics in the 1.4.x
> series would be FLINK-8419 [1].
> The ‘records_lag_max’ metric is a Kafka-shipped metric simply forwarded
> from the internally used Kafka client, so nothing should have been affected.
>
> Do you see other metrics under the pattern of 
> ‘flink_taskmanager_job_task_operator_*’?
> All Kafka-shipped metrics should still follow this pattern.
> If not, could you find the ‘records_lag_max’ metric (or any other
> Kafka-shipped metrics [2]) under the user scope ‘KafkaConsumer’?
>
> The above should provide more insight into what may be wrong here.
>
> - Gordon
>
> [1] https://issues.apache.org/jira/browse/FLINK-8419
> [2] https://docs.confluent.io/current/kafka/monitoring.html#fetch-metrics
>
> On 12 June 2018 at 11:47:51 PM, Julio Biason (julio.bia...@azion.com)
> wrote:
>
> Hey guys,
>
> I just updated our Flink install from 1.4.0 to 1.4.2, but our Prometheus
> monitoring is not getting the current Kafka lag.
>
> After updating to 1.4.2 and making the symlink between
> opt/flink-metrics-prometheus-1.4.2.jar to lib/, I got the metrics back on
> Prometheus, but the most important one, 
> flink_taskmanager_job_task_operator_records_lag_max
> is now returning -Inf.
>
> Did I miss something?
>
> --
> *Julio Biason*, Sofware Engineer
> *AZION*  |  Deliver. Accelerate. Protect.
> Office: +55 51 3083 8101   |  Mobile: +55 51
> *99907 0554*
>
>


-- 
*Julio Biason*, Sofware Engineer
*AZION*  |  Deliver. Accelerate. Protect.
Office: +55 51 3083 8101   |  Mobile: +55 51
*99907 0554*


Stream Join With Early firings

2018-06-13 Thread Johannes Schulte
Hi,

I am joining two streams with a session window and want to emit a joined
(early) result for every element arriving on one of the streams.

Currently the code looks like this:

s1.join(s2)
.where(s1.id).equalTo(s2.id)
.window(EventTimeSessionWindows.withGap(Time.minutes(15)))
// trigger(?)
.apply(...custom code..)

What I am missing is the right trigger ala "withEarlyFiring" - do I need to
implement my on trigger for this and if yes, what kind of functionality
must be present to not break the session window semantics?

Thanks in advance,

Johannes


Re: flink_taskmanager_job_task_operator_records_lag_max == -Inf on Flink 1.4.2

2018-06-13 Thread Tzu-Li (Gordon) Tai
Hi,

Which Kafka version are you using?

AFAIK, the only recent changes to Kafka connector metrics in the 1.4.x series 
would be FLINK-8419 [1].
The ‘records_lag_max’ metric is a Kafka-shipped metric simply forwarded from 
the internally used Kafka client, so nothing should have been affected.

Do you see other metrics under the pattern of 
‘flink_taskmanager_job_task_operator_*’? All Kafka-shipped metrics should still 
follow this pattern.
If not, could you find the ‘records_lag_max’ metric (or any other Kafka-shipped 
metrics [2]) under the user scope ‘KafkaConsumer’?

The above should provide more insight into what may be wrong here.

- Gordon

[1] https://issues.apache.org/jira/browse/FLINK-8419
[2] https://docs.confluent.io/current/kafka/monitoring.html#fetch-metrics

On 12 June 2018 at 11:47:51 PM, Julio Biason (julio.bia...@azion.com) wrote:

Hey guys,

I just updated our Flink install from 1.4.0 to 1.4.2, but our Prometheus 
monitoring is not getting the current Kafka lag.

After updating to 1.4.2 and making the symlink between 
opt/flink-metrics-prometheus-1.4.2.jar to lib/, I got the metrics back on 
Prometheus, but the most important one, 
flink_taskmanager_job_task_operator_records_lag_max is now returning -Inf.

Did I miss something?

--
Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: +55 51 3083 8101  |  Mobile: +55 51 99907 0554

CsvTableSource error handling

2018-06-13 Thread Athar Naved
Hi There,

I am trying to parse multiple csv files in a directory using
CsvTableSource and insert each row into cassandra using CassandraSink.

How does flink handle any errors to parse some of the csv files within that
directory?

-- 
Thanks & Regards,
Athar


Re: State life-cycle for different state-backend implementations

2018-06-13 Thread Rinat
Hi Sihua, Thx for your reply

> On 9 Jun 2018, at 11:42, sihua zhou  wrote:
> 
> Hi Rinat,
> 
> I think there is one configuration {{state.checkpoints.num-retained}} to 
> control the maximum number of completed checkpoints to retain, the default 
> value is 1. So the risk you mentioned should not happen. Refer to 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#checkpointing
>  
> 
>  you could find more configurations of checkpoint.
> 
> Best, Sihua
> 
> 
> On 06/8/2018 22:55,Rinat 
>  wrote: 
> Hi mates, got a question about different state backends.
> 
> As I've properly understood, on every checkpoint, Flink flushes it’s current 
> state into backend. In case of FsStateBackend we’ll have a separate file for 
> each checkpoint, and during the job lifecycle we got a risk of 
> a huge amount of state files in hdfs, that is not very cool for a hadoop 
> name-node.
> 
> Does Flink have any clean-up strategies for it’s state in different 
> implementation of backends ? If you could provide any links, where I could 
> read about more details of this process, it’ll be awesome ))
> 
> Thx a lot for your help.
> 
> Sincerely yours,
> Rinat Sharipov
> Software Engineer at 1DMP CORE Team
> 
> email: r.shari...@cleverdata.ru 
> mobile: +7 (925) 416-37-26
> 
> CleverDATA
> make your data clever
> 
> 

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru 
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever