How to debug why Flink makes and executes only partial plan

2016-10-14 Thread Satish Chandra Gupta
Hi,

In my Flink program, after a couple of map, union and connect, I have a
final filter and a sink. Something like this (after abstracting out
details):

val filteredEvents: DataStream[NotificationEvent]
  = allThisStuffWorking
  .name("filtered_users")

filteredEvents
  *.filter(x => check(x.f1, x.f2, someStuff)) //BUG*
  .addSink(new NotificationSinkFunction(notifier))
  .name("send_notification")

The check function returns a Boolean and does not access anything other
than parameters passed. Here is relevant part of Notification Sink Function:

class NotificationSinkFunction(notifier: Notifier)
  extends SinkFunction[NotificationEvent] {

  val LOG: Logger = LoggerFactory.getLogger(this.getClass)

  def invoke(event: NotificationEvent): Unit = {
LOG.info("Log this notification detail")
*notifier.send(**event.f1, event.f2) //BUG*
  }
}

If I comment out the lines highlighted and marked with //BUG, the Flink
pipeline works and print the log messages, and Flink shows this execution
plan at the end:

filtered_users -> Sink: send_notification

[image: Inline image 1]


But with either of those two lines marked as BUG above, Flink makes and
executes plan only till filtered_user and does not print the log message.

[image: Inline image 2]

How can I figure out what is wrong with the check function or notifier send
function that prevents Flink from making the full plan. What are the
typical mistakes leading to this?

Thanks,
+satish


A question regarding to the checkpoint mechanism

2016-10-14 Thread Li Wang
Hi all,

As far as I know, a stateful operator will checkpoint its current state to a 
persistent storage when it receives all the barrier from all of its upstream 
operators. My question is that does the operator doing the checkpoint need to 
pause processing the input tuples for the next batch until the checkpoint is 
done?  If yes, will it introduce significant processing latency when the state 
is large. If no, does this need the operator state to be immutable?

Thanks,
Li

Task and Operator Monitoring via JMX / naming

2016-10-14 Thread Philipp Bussche
Hi there,
I am struggeling to understand what I am looking at after enabling JMX
metric reporting on my taskmanager.
The job I am trying this out with has 1 source, 2 map functions (where one
is a RichMap) and 3 sinks.
This is how I have written my Job:

DataStream invitations = streaming
.addSource(new FlinkKafkaConsumer09<>(

dsp.getPropertyAsString("kafka.invitation.topic"),
new InvitationSchema(),

kafkaProps)).name("KafkaSource");
invitations.addSink(new
PostgresqlInvitationDetailsSink<>(psqlConfig)).name("InvitationDetailSink");

DataStream> tokens = invitations
.map(new 
TokenExtractor()).name("TokenMapStream");
tokens.addSink(new
PostgresqlInvitationTokenSink<>(psqlConfig)).name("InvitationTokenSink");

DataStream> invitationResponses =
invitations
.map(new InvitationDetailsExtractor(psqlConfig,
tokenToSenderMapping)).name("InvitationDetailsRichMapStream");
invitationResponses.addSink(new
Neo4JInvitationSink<>(neo4jConfig)).name("InvitationRelationshipSink");

streaming.execute("InvitationJob");

Somehow I was expecting to have metrics representing the source, the sinks
and the operators, however instead of 6 entries in my JMX tree I only have
4. Please see screenshot attached. Also I was somehow expecting the JMX
objects to be named like my task / operator names but it has all sorts of
prefix/suffix magic around the names. Finally I have one custom metric which
obviously is attached to my RichMapFunction (InvitationDetailsExtractor).
However the custom metric (invitationDetailsAdded) shows up under an object
where one of the keys (which I would expect to be set to the operation name)
is a combination of the prefix "Sink" plus the name of the first sink that I
am using plus the name of the first map function (which is not the
RichMapFunction actually: my custom metric "invitationDetailsAdded" shows up
under "(Sink-_InvitationDetailSink-_TokenMapStream" which is very confusing
because this metric is actually incremented as part of the
InvitationDetailsRichMapStream). 

Can somebody please explain what I can expect from metrics exposed via JMX
(should they really represent my tasks and operations) and why the naming is
so strange ?

Thanks
Philipp


 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Task-and-Operator-Monitoring-via-JMX-naming-tp9560.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Flink Cluster is savepointing jobmanager instead of external filesystem

2016-10-14 Thread Aljoscha Krettek
Hi,
are you running this standalone or on YARN?

I'm also directly looping in Ufuk because he knows that stuff best. Maybe
he has an idea what could be going on.

Cheers,
Aljoscha

On Fri, 14 Oct 2016 at 18:39 Jason Brelloch  wrote:

> Hey all,
>
> We are running into an issue where the savepoint command is saving to the
> jobmanager instead of the filesystem location specified in the
> flink-conf.yaml.  It saves to the filesystem when we are running in a local
> standalone flink, but when we deploy the job to our flink cluster it saves
> to the jobmanager.  The flink-conf is nearly identical in both places.
> Here are the relevant savepoint parameters.
>
> savepoints.state.backend: filesystem
> savepoints.state.backend.fs.dir: gs://bettercloud-non-prod/flink-savepoints
>
> Is there another configuration we have to set that is specific to clusters
> to make this work?  Or can anyone think of a reason this would occur?
>
> --
> *Jason Brelloch* | Product Developer
> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305
> 
> Subscribe to the BetterCloud Monitor
> 
>  -
> Get IT delivered to your inbox
>


Re: job failure with checkpointing enabled

2016-10-14 Thread Aljoscha Krettek
Hi,
the file that Flink is trying to create there is not meant to be in the
checkpointing location. It is a local file that is used for buffering
elements until a checkpoint barrier arrives (for certain cases). Can you
check whether the base path where it is trying to create that file exists?
For the exception that you posted that would be:
/tmp/flink-io-202fdf67-3f8c-47dd-8ebc-2265430644ed

Cheers,
Aljoscha

On Fri, 14 Oct 2016 at 17:37  wrote:

> I recently tried enabling checkpointing in a job (that previously works
> w/o checkpointing) and received the following failure on job execution:
>
>
>
> java.io.FileNotFoundException:
> /tmp/flink-io-202fdf67-3f8c-47dd-8ebc-2265430644ed/a426eb27761575b3b79e464719bba96e16a1869d85bae292a2ef7eb72fa8a14c.0.buffer
> (No such file or directory)
>
> at java.io.RandomAccessFile.open0(Native Method)
>
> at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
>
> at java.io.RandomAccessFile.(RandomAccessFile.java:243)
>
> at
> org.apache.flink.streaming.runtime.io.BufferSpiller.createSpillingChannel(BufferSpiller.java:247)
>
> at
> org.apache.flink.streaming.runtime.io.BufferSpiller.(BufferSpiller.java:117)
>
> at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.(BarrierBuffer.java:94)
>
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.(StreamInputProcessor.java:96)
>
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.init(OneInputStreamTask.java:49)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:239)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
>
> The job then restarts and fails again in an endless cycle.
>
>
>
> This feels like a configuration issue.  My guess is that Flink is looking
> for the file above on local storage, though we’ve configured checkpointing
> to use hdfs (see below).
>
>
>
> To enable checkpointing, this is what I did:
>
> env.enableCheckpointing(3000l);
>
>
>
> Relevant configurations in flink-conf.yaml:
>
> state.backend: filesystem
>
> state.backend.fs.checkpointdir:
> hdfs://myhadoopnamenode:8020/apps/flink/checkpoints
>
>
>
> Note, the directory we’ve configured is not the same as the path indicated
> in the error.
>
>
>
> Interestingly, there are plenty of subdirs in my checkpoints directory,
> these appear to correspond to job start times, even though these jobs don’t
> have checkpointing enabled:
>
> drwxr-xr-x   - rtap hdfs  0 2016-10-13 07:48
> /apps/flink/checkpoints/b4870565f148cff10478dca8bff27bf7
>
> drwxr-xr-x   - rtap hdfs  0 2016-10-13 08:27
> /apps/flink/checkpoints/044b21a0f252b6142e7ddfee7bfbd7d5
>
> drwxr-xr-x   - rtap hdfs  0 2016-10-13 08:36
> /apps/flink/checkpoints/a658b23c2d2adf982a2cf317bfb3d3de
>
> drwxr-xr-x   - rtap hdfs  0 2016-10-14 07:38
> /apps/flink/checkpoints/1156bd1796105ad95a8625cb28a0b816
>
> drwxr-xr-x   - rtap hdfs  0 2016-10-14 07:41
> /apps/flink/checkpoints/58fdd94b7836a3b3ed9abc5c8f3a1dd5
>
> drwxr-xr-x   - rtap hdfs  0 2016-10-14 07:43
> /apps/flink/checkpoints/47a849a8ed6538b9e7d3826a628d38b9
>
> drwxr-xr-x   - rtap hdfs  0 2016-10-14 07:49
> /apps/flink/checkpoints/e6a9e2300ea5c36341fa160adab789f0
>
>
>
> Thanks!
>
>
>
>
>
> --
> The information contained in this communication is confidential and
> intended only for the use of the recipient named above, and may be legally
> privileged and exempt from disclosure under applicable law. If the reader
> of this message is not the intended recipient, you are hereby notified that
> any dissemination, distribution or copying of this communication is
> strictly prohibited. If you have received this communication in error,
> please resend it to the sender and delete the original message and copy of
> it from your computer system. Opinions, conclusions and other information
> in this message that do not relate to our official business should be
> understood as neither given nor endorsed by the company.
>


Re: [DISCUSS] Deprecate Hadoop source method from (batch) ExecutionEnvironment

2016-10-14 Thread Shannon Carey
Yep!

From: Fabian Hueske >
Date: Friday, October 14, 2016 at 11:00 AM
To: Shannon Carey >
Cc: "user@flink.apache.org" 
>
Subject: Re: [DISCUSS] Deprecate Hadoop source method from (batch) 
ExecutionEnvironment

Hi Shannon,

the plan is as follows:

We will keep the methods as they are for 1.2 but deprecate them and at the same 
time we will add alternatives in an optional dependency.
In a later release, the deprecated methods will be removed and everybody has to 
switch to the optional dependency.

Does that work for you?

Best, Fabian

2016-10-14 17:30 GMT+02:00 Shannon Carey 
>:
Speaking as a user, if you are suggesting that you will retain the 
functionality but move the methods to an optional dependency, it makes sense to 
me. We have used the Hadoop integration for AvroParquetInputFormat and 
CqlBulkOutputFormat in Flink (although we won't be using CqlBulkOutputFormat 
any longer because it doesn't seem to be reliable).

-Shannon

From: Fabian Hueske >
Date: Friday, October 14, 2016 at 4:29 AM
To: >, 
"d...@flink.apache.org" 
>
Subject: [DISCUSS] Deprecate Hadoop source method from (batch) 
ExecutionEnvironment

Hi everybody,

I would like to propose to deprecate the utility methods to read data with 
Hadoop InputFormats from the (batch) ExecutionEnvironment.

The motivation for deprecating these methods is reduce Flink's dependency on 
Hadoop but rather have Hadoop as an optional dependency for users that actually 
need it (HDFS, MapRed-Compat, ...). Eventually, we want to have Flink 
distribution that does not have a hard Hadoop dependency.

One step for this is to remove the Hadoop dependency from flink-java (Flink's 
Java DataSet API) which is currently required due to the above utility methods 
(see FLINK-4315). We recently received a PR that addresses FLINK-4315 and 
removes the Hadoop methods from the ExecutionEnvironment. After some 
discussion, it was decided to defer the PR to Flink 2.0 because it breaks the 
API (these methods are delared @PublicEvolving).

I propose to accept this PR for Flink 1.2, but instead of removing the methods 
deprecating them.
This would help to migrate old code and prevent new usage of these methods.
For a later Flink release (1.3 or 2.0) we could remove these methods and the 
Hadoop dependency on flink-java.

What do others think?

Best, Fabian



Re: [DISCUSS] Deprecate Hadoop source method from (batch) ExecutionEnvironment

2016-10-14 Thread Fabian Hueske
Hi Shannon,

the plan is as follows:

We will keep the methods as they are for 1.2 but deprecate them and at the
same time we will add alternatives in an optional dependency.
In a later release, the deprecated methods will be removed and everybody
has to switch to the optional dependency.

Does that work for you?

Best, Fabian

2016-10-14 17:30 GMT+02:00 Shannon Carey :

> Speaking as a user, if you are suggesting that you will retain the
> functionality but move the methods to an optional dependency, it makes
> sense to me. We have used the Hadoop integration for
> AvroParquetInputFormat and CqlBulkOutputFormat in Flink (although we won't
> be using CqlBulkOutputFormat any longer because it doesn't seem to be
> reliable).
>
> -Shannon
>
> From: Fabian Hueske 
> Date: Friday, October 14, 2016 at 4:29 AM
> To: , "d...@flink.apache.org" 
> Subject: [DISCUSS] Deprecate Hadoop source method from (batch)
> ExecutionEnvironment
>
> Hi everybody,
>
> I would like to propose to deprecate the utility methods to read data with
> Hadoop InputFormats from the (batch) ExecutionEnvironment.
>
> The motivation for deprecating these methods is reduce Flink's dependency
> on Hadoop but rather have Hadoop as an optional dependency for users that
> actually need it (HDFS, MapRed-Compat, ...). Eventually, we want to have
> Flink distribution that does not have a hard Hadoop dependency.
>
> One step for this is to remove the Hadoop dependency from flink-java
> (Flink's Java DataSet API) which is currently required due to the above
> utility methods (see FLINK-4315). We recently received a PR that addresses
> FLINK-4315 and removes the Hadoop methods from the ExecutionEnvironment.
> After some discussion, it was decided to defer the PR to Flink 2.0 because
> it breaks the API (these methods are delared @PublicEvolving).
>
> I propose to accept this PR for Flink 1.2, but instead of removing the
> methods deprecating them.
> This would help to migrate old code and prevent new usage of these methods.
> For a later Flink release (1.3 or 2.0) we could remove these methods and
> the Hadoop dependency on flink-java.
>
> What do others think?
>
> Best, Fabian
>


job failure with checkpointing enabled

2016-10-14 Thread robert.lancaster
I recently tried enabling checkpointing in a job (that previously works w/o 
checkpointing) and received the following failure on job execution:

java.io.FileNotFoundException: 
/tmp/flink-io-202fdf67-3f8c-47dd-8ebc-2265430644ed/a426eb27761575b3b79e464719bba96e16a1869d85bae292a2ef7eb72fa8a14c.0.buffer
 (No such file or directory)
at java.io.RandomAccessFile.open0(Native Method)
at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
at java.io.RandomAccessFile.(RandomAccessFile.java:243)
at 
org.apache.flink.streaming.runtime.io.BufferSpiller.createSpillingChannel(BufferSpiller.java:247)
at 
org.apache.flink.streaming.runtime.io.BufferSpiller.(BufferSpiller.java:117)
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.(BarrierBuffer.java:94)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.(StreamInputProcessor.java:96)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.init(OneInputStreamTask.java:49)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:239)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)


The job then restarts and fails again in an endless cycle.

This feels like a configuration issue.  My guess is that Flink is looking for 
the file above on local storage, though we’ve configured checkpointing to use 
hdfs (see below).

To enable checkpointing, this is what I did:
env.enableCheckpointing(3000l);

Relevant configurations in flink-conf.yaml:
state.backend: filesystem
state.backend.fs.checkpointdir: 
hdfs://myhadoopnamenode:8020/apps/flink/checkpoints

Note, the directory we’ve configured is not the same as the path indicated in 
the error.

Interestingly, there are plenty of subdirs in my checkpoints directory, these 
appear to correspond to job start times, even though these jobs don’t have 
checkpointing enabled:
drwxr-xr-x   - rtap hdfs  0 2016-10-13 07:48 
/apps/flink/checkpoints/b4870565f148cff10478dca8bff27bf7
drwxr-xr-x   - rtap hdfs  0 2016-10-13 08:27 
/apps/flink/checkpoints/044b21a0f252b6142e7ddfee7bfbd7d5
drwxr-xr-x   - rtap hdfs  0 2016-10-13 08:36 
/apps/flink/checkpoints/a658b23c2d2adf982a2cf317bfb3d3de
drwxr-xr-x   - rtap hdfs  0 2016-10-14 07:38 
/apps/flink/checkpoints/1156bd1796105ad95a8625cb28a0b816
drwxr-xr-x   - rtap hdfs  0 2016-10-14 07:41 
/apps/flink/checkpoints/58fdd94b7836a3b3ed9abc5c8f3a1dd5
drwxr-xr-x   - rtap hdfs  0 2016-10-14 07:43 
/apps/flink/checkpoints/47a849a8ed6538b9e7d3826a628d38b9
drwxr-xr-x   - rtap hdfs  0 2016-10-14 07:49 
/apps/flink/checkpoints/e6a9e2300ea5c36341fa160adab789f0

Thanks!




The information contained in this communication is confidential and intended 
only for the use of the recipient named above, and may be legally privileged 
and exempt from disclosure under applicable law. If the reader of this message 
is not the intended recipient, you are hereby notified that any dissemination, 
distribution or copying of this communication is strictly prohibited. If you 
have received this communication in error, please resend it to the sender and 
delete the original message and copy of it from your computer system. Opinions, 
conclusions and other information in this message that do not relate to our 
official business should be understood as neither given nor endorsed by the 
company.


Re: [DISCUSS] Deprecate Hadoop source method from (batch) ExecutionEnvironment

2016-10-14 Thread Shannon Carey
Speaking as a user, if you are suggesting that you will retain the 
functionality but move the methods to an optional dependency, it makes sense to 
me. We have used the Hadoop integration for AvroParquetInputFormat and 
CqlBulkOutputFormat in Flink (although we won't be using CqlBulkOutputFormat 
any longer because it doesn't seem to be reliable).

-Shannon

From: Fabian Hueske >
Date: Friday, October 14, 2016 at 4:29 AM
To: >, 
"d...@flink.apache.org" 
>
Subject: [DISCUSS] Deprecate Hadoop source method from (batch) 
ExecutionEnvironment

Hi everybody,

I would like to propose to deprecate the utility methods to read data with 
Hadoop InputFormats from the (batch) ExecutionEnvironment.

The motivation for deprecating these methods is reduce Flink's dependency on 
Hadoop but rather have Hadoop as an optional dependency for users that actually 
need it (HDFS, MapRed-Compat, ...). Eventually, we want to have Flink 
distribution that does not have a hard Hadoop dependency.

One step for this is to remove the Hadoop dependency from flink-java (Flink's 
Java DataSet API) which is currently required due to the above utility methods 
(see FLINK-4315). We recently received a PR that addresses FLINK-4315 and 
removes the Hadoop methods from the ExecutionEnvironment. After some 
discussion, it was decided to defer the PR to Flink 2.0 because it breaks the 
API (these methods are delared @PublicEvolving).

I propose to accept this PR for Flink 1.2, but instead of removing the methods 
deprecating them.
This would help to migrate old code and prevent new usage of these methods.
For a later Flink release (1.3 or 2.0) we could remove these methods and the 
Hadoop dependency on flink-java.

What do others think?

Best, Fabian


Re: HivePartitionTap is not working with cascading flink

2016-10-14 Thread Fabian Hueske
Hi Santlal,

I'm afraid I don't know what is going wrong here either.
Debugging and correctly configuring the Taps was one of the major obstacles
when implementing the connector.

Best, Fabian

2016-10-14 14:40 GMT+02:00 Aljoscha Krettek :

> +Fabian directly looping in Fabian since he worked on the Cascading/Flink
> integration.
>
> Do you have any idea about this?
>
>
> On Fri, 14 Oct 2016 at 08:42 Santlal J Gupta  com> wrote:
>
>> Hi,
>>
>>
>>
>> I am new to flink, I have created cascading job and used *
>> FlinkFlowConnector.* I have used *HivePartitionTap * to create
>> partition, but it fails and gives *SQLIntegrityConstraintViolationExceptio
>> n and AlreadyExistsException. *I had posted question on Cascading user
>> group, but I have not got solution of it.
>>
>>
>>
>> For more you can view cascading user group link:
>> https://groups.google.com/forum/#!topic/cascading-user/PqyT9SLDQyk
>>
>>
>>
>> Thanks
>>
>> Santlal J Gupta
>>
>>
>> **Disclaimer**
>> This e-mail message and any attachments may contain confidential
>> information and is for the sole use of the intended recipient(s) only. Any
>> views or opinions presented or implied are solely those of the author and
>> do not necessarily represent the views of BitWise. If you are not the
>> intended recipient(s), you are hereby notified that disclosure, printing,
>> copying, forwarding, distribution, or the taking of any action whatsoever
>> in reliance on the contents of this electronic information is strictly
>> prohibited. If you have received this e-mail message in error, please
>> immediately notify the sender and delete the electronic message and any
>> attachments.BitWise does not accept liability for any virus introduced by
>> this e-mail or any attachments. **
>> **
>>
>


Re: HivePartitionTap is not working with cascading flink

2016-10-14 Thread Aljoscha Krettek
+Fabian directly looping in Fabian since he worked on the Cascading/Flink
integration.

Do you have any idea about this?

On Fri, 14 Oct 2016 at 08:42 Santlal J Gupta <
santlal.gu...@bitwiseglobal.com> wrote:

> Hi,
>
>
>
> I am new to flink, I have created cascading job and used *
> FlinkFlowConnector.* I have used *HivePartitionTap * to create partition,
> but it fails and gives *SQLIntegrityConstraintViolationExceptio n
> and AlreadyExistsException. *I had posted question on Cascading user
> group, but I have not got solution of it.
>
>
>
> For more you can view cascading user group link:
> https://groups.google.com/forum/#!topic/cascading-user/PqyT9SLDQyk
>
>
>
> Thanks
>
> Santlal J Gupta
>
>
> **Disclaimer**
> This e-mail message and any attachments may contain confidential
> information and is for the sole use of the intended recipient(s) only. Any
> views or opinions presented or implied are solely those of the author and
> do not necessarily represent the views of BitWise. If you are not the
> intended recipient(s), you are hereby notified that disclosure, printing,
> copying, forwarding, distribution, or the taking of any action whatsoever
> in reliance on the contents of this electronic information is strictly
> prohibited. If you have received this e-mail message in error, please
> immediately notify the sender and delete the electronic message and any
> attachments.BitWise does not accept liability for any virus introduced by
> this e-mail or any attachments.
> 
>
>


Re: ConcurrentModificationException when using histogram accumulators

2016-10-14 Thread Till Rohrmann
Hi Yukun,

I think you've found a bug in the code. The accumulators don't seem to be
really thread safe. I've created an issue to fix this issue [1]. Thanks for
reporting the problem :-)

[1] https://issues.apache.org/jira/browse/FLINK-4829

Cheers,
Till

On Fri, Oct 14, 2016 at 8:32 AM, Yukun Guo  wrote:

> This happens when the TaskManager is serializing an
> org.apache.flink.api.common.accumulators.Histogram by iterating through
> the underlying TreeMap while a MapFunction for updating the accumulator
> attempts to modify the TreeMap concurrently. How could I fix it?
>
>
> The call stack:
>
> WARN  org.apache.flink.runtime.accumulators.AccumulatorRegistry -
> Failed to serialize accumulators for task.
> java.util.ConcurrentModificationException
> at java.util.TreeMap$PrivateEntryIterator.
> nextEntry(TreeMap.java:1211)
> at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247)
> at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242)
> at java.util.TreeMap.writeObject(TreeMap.java:2436)
> at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeWriteObject(
> ObjectStreamClass.java:1028)
> at java.io.ObjectOutputStream.writeSerialData(
> ObjectOutputStream.java:1496)
> at java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.defaultWriteFields(
> ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(
> ObjectOutputStream.java:1509)
> at java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(
> ObjectOutputStream.java:348)
> at java.util.HashMap.internalWriteEntries(HashMap.java:1785)
> at java.util.HashMap.writeObject(HashMap.java:1362)
> at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeWriteObject(
> ObjectStreamClass.java:1028)
> at java.io.ObjectOutputStream.writeSerialData(
> ObjectOutputStream.java:1496)
> at java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(
> ObjectOutputStream.java:348)
> at org.apache.flink.util.InstantiationUtil.serializeObject(
> InstantiationUtil.java:301)
> at org.apache.flink.util.SerializedValue.(
> SerializedValue.java:52)
> at org.apache.flink.runtime.accumulators.
> AccumulatorSnapshot.(AccumulatorSnapshot.java:58)
> at org.apache.flink.runtime.accumulators.AccumulatorRegistry.
> getSnapshot(AccumulatorRegistry.java:75)
> at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$
> sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286)
> ...
>


Re: [DISCUSS] Deprecate Hadoop source method from (batch) ExecutionEnvironment

2016-10-14 Thread Aljoscha Krettek
+1 for deprecating and the removing.

On Fri, 14 Oct 2016 at 11:38 Till Rohrmann  wrote:

> Fabian's proposal sounds good to me. It would be a good first step towards
> removing our dependency on Hadoop.
>
> Thus, +1 for the changes.
>
> Cheers,
> Till
>
> On Fri, Oct 14, 2016 at 11:29 AM, Fabian Hueske  wrote:
>
> Hi everybody,
>
> I would like to propose to deprecate the utility methods to read data with
> Hadoop InputFormats from the (batch) ExecutionEnvironment.
>
> The motivation for deprecating these methods is reduce Flink's dependency
> on Hadoop but rather have Hadoop as an optional dependency for users that
> actually need it (HDFS, MapRed-Compat, ...). Eventually, we want to have
> Flink distribution that does not have a hard Hadoop dependency.
>
> One step for this is to remove the Hadoop dependency from flink-java
> (Flink's Java DataSet API) which is currently required due to the above
> utility methods (see FLINK-4315). We recently received a PR that addresses
> FLINK-4315 and removes the Hadoop methods from the ExecutionEnvironment.
> After some discussion, it was decided to defer the PR to Flink 2.0 because
> it breaks the API (these methods are delared @PublicEvolving).
>
> I propose to accept this PR for Flink 1.2, but instead of removing the
> methods deprecating them.
> This would help to migrate old code and prevent new usage of these methods.
> For a later Flink release (1.3 or 2.0) we could remove these methods and
> the Hadoop dependency on flink-java.
>
> What do others think?
>
> Best, Fabian
>
>
>


Re: Listening to timed-out patterns in Flink CEP

2016-10-14 Thread Till Rohrmann
Hi guys,

I'll try to come up with an example illustrating the behaviour over the
weekend.

Cheers,
Till

On Fri, Oct 14, 2016 at 11:16 AM, David Koch  wrote:

> Hello,
>
> Thanks for the code Sameer. Unfortunately, it didn't solve the issue.
> Compared to what I did the principle is the same - make sure that the
> watermark advances even without events present to trigger timeouts in CEP
> patterns.
>
> If Till or anyone else could provide a minimal example illustrating the
> supposed behaviour of:
>
> [CEP] timeout will be detected when the first watermark exceeding the
>> timeout value is received
>
>
> I'd very much appreciate it.
>
> Regards,
>
> David
>
>
> On Wed, Oct 12, 2016 at 1:54 AM, Sameer W  wrote:
>
>> Try this. Your WM's need to move forward. Also don't use System
>> Timestamp. Use the timestamp of the element seen as the reference as the
>> elements are most likely lagging the system timestamp.
>>
>> DataStream withTimestampsAndWatermarks = tuples
>> .assignTimestampsAndWatermarks(new 
>> AssignerWithPeriodicWatermarks()
>> {
>>
>> long waterMarkTmst;
>> long lastEmittedWM=0;
>> @Override
>> public long extractTimestamp(Event element, long
>> previousElementTimestamp) {
>> if(element.tmst>lastEmittedWM){
>>waterMarkTmst = element.tmst-1; //Assumes increasing
>> timestamps. Need to subtract 1 as more elements with same TS might arrive
>> }
>> return element.tmst;
>> }
>>
>> @Override
>> public Watermark getCurrentWatermark() {
>> if(lastEmittedWM==waterMarkTmst){ //No new event seen,
>> move the WM forward by auto watermark interval
>> waterMarkTmst = waterMarkTmst + 1000l//Increase by
>> auto watermark interval (Watermarks only move forward in time)
>> }
>> lastEmittedWM = waterMarkTmst
>>
>> System.out.println(String.format("Watermark at %s", new
>> Date(waterMarkTmst)));
>> return new Watermark(waterMarkTmst);//Until an event is
>> seem WM==0 starts advancing by 1000ms until an event is seen
>> }
>> }).keyBy("key");
>>
>> On Tue, Oct 11, 2016 at 7:29 PM, David Koch 
>> wrote:
>>
>>> Hello,
>>>
>>> I tried setting the watermark to System.currentTimeMillis() - 5000L,
>>> event timestamps are System.currentTimeMillis(). I do not observe the
>>> expected behaviour of the PatternTimeoutFunction firing once the watermark
>>> moves past the timeout "anchored" by a pattern match.
>>>
>>> Here is the complete test class source ,
>>> in case someone is interested. The timestamp/watermark assigner looks like
>>> this:
>>>
>>> DataStream withTimestampsAndWatermarks = tuples
>>> .assignTimestampsAndWatermarks(new
>>> AssignerWithPeriodicWatermarks() {
>>>
>>> long waterMarkTmst;
>>>
>>> @Override
>>> public long extractTimestamp(Event element, long
>>> previousElementTimestamp) {
>>> return element.tmst;
>>> }
>>>
>>> @Override
>>> public Watermark getCurrentWatermark() {
>>> waterMarkTmst = System.currentTimeMillis() - 5000L;
>>> System.out.println(String.format("Watermark at %s", new
>>> Date(waterMarkTmst)));
>>> return new Watermark(waterMarkTmst);
>>> }
>>> }).keyBy("key");
>>>
>>> withTimestampsAndWatermarks.getExecutionConfig().setAutoWate
>>> rmarkInterval(1000L);
>>>
>>> // Apply pattern filtering on stream.
>>> PatternStream patternStream = 
>>> CEP.pattern(withTimestampsAndWatermarks,
>>> pattern);
>>>
>>> Any idea what's wrong?
>>>
>>> David
>>>
>>>
>>> On Tue, Oct 11, 2016 at 10:20 PM, Sameer W  wrote:
>>>
 Assuming an element with timestamp which is later than the last emitted
 watermark arrives, would it just be dropped because the PatternStream does
 not have a max allowed lateness method? In that case it appears that CEP
 cannot handle late events yet out of the box.

 If we do want to support late events can we chain a
 keyBy().timeWindow().allowedLateness(x).map().assignTimestampsAndWatermarks().keyBy()
 again before handing it to the CEP operator. This way we may have the
 patterns fired multiple times but it allows an event to be late and out of
 order. It looks like it will work but is there a less convoluted way.

 Thanks,
 Sameer

 On Tue, Oct 11, 2016 at 12:17 PM, Till Rohrmann <
 till.rohrm...@gmail.com> wrote:

> But then no element later than the last emitted watermark must be
> issued by the sources. If that is the case, then this solution should 
> work.
>
> Cheers,
> Till
>
> On Tue, Oct 11, 2016 at 4:50 PM, Sameer W 

Re: [DISCUSS] Deprecate Hadoop source method from (batch) ExecutionEnvironment

2016-10-14 Thread Till Rohrmann
Fabian's proposal sounds good to me. It would be a good first step towards
removing our dependency on Hadoop.

Thus, +1 for the changes.

Cheers,
Till

On Fri, Oct 14, 2016 at 11:29 AM, Fabian Hueske  wrote:

> Hi everybody,
>
> I would like to propose to deprecate the utility methods to read data with
> Hadoop InputFormats from the (batch) ExecutionEnvironment.
>
> The motivation for deprecating these methods is reduce Flink's dependency
> on Hadoop but rather have Hadoop as an optional dependency for users that
> actually need it (HDFS, MapRed-Compat, ...). Eventually, we want to have
> Flink distribution that does not have a hard Hadoop dependency.
>
> One step for this is to remove the Hadoop dependency from flink-java
> (Flink's Java DataSet API) which is currently required due to the above
> utility methods (see FLINK-4315). We recently received a PR that addresses
> FLINK-4315 and removes the Hadoop methods from the ExecutionEnvironment.
> After some discussion, it was decided to defer the PR to Flink 2.0 because
> it breaks the API (these methods are delared @PublicEvolving).
>
> I propose to accept this PR for Flink 1.2, but instead of removing the
> methods deprecating them.
> This would help to migrate old code and prevent new usage of these methods.
> For a later Flink release (1.3 or 2.0) we could remove these methods and
> the Hadoop dependency on flink-java.
>
> What do others think?
>
> Best, Fabian
>


[DISCUSS] Deprecate Hadoop source method from (batch) ExecutionEnvironment

2016-10-14 Thread Fabian Hueske
Hi everybody,

I would like to propose to deprecate the utility methods to read data with
Hadoop InputFormats from the (batch) ExecutionEnvironment.

The motivation for deprecating these methods is reduce Flink's dependency
on Hadoop but rather have Hadoop as an optional dependency for users that
actually need it (HDFS, MapRed-Compat, ...). Eventually, we want to have
Flink distribution that does not have a hard Hadoop dependency.

One step for this is to remove the Hadoop dependency from flink-java
(Flink's Java DataSet API) which is currently required due to the above
utility methods (see FLINK-4315). We recently received a PR that addresses
FLINK-4315 and removes the Hadoop methods from the ExecutionEnvironment.
After some discussion, it was decided to defer the PR to Flink 2.0 because
it breaks the API (these methods are delared @PublicEvolving).

I propose to accept this PR for Flink 1.2, but instead of removing the
methods deprecating them.
This would help to migrate old code and prevent new usage of these methods.
For a later Flink release (1.3 or 2.0) we could remove these methods and
the Hadoop dependency on flink-java.

What do others think?

Best, Fabian


Re: SVM Multiclass classification

2016-10-14 Thread Theodore Vasiloudis
Hello Kursat,

As noted in the documentation, the SVM implementation is for binary
classification only for the time being.

Regards,
Theodore

-- 
Sent from a mobile device. May contain autocorrect errors.

On Oct 13, 2016 8:53 PM, "Kürşat Kurt"  wrote:

> Hi;
>
>
>
> I am trying to classify documents.
>
> When i try to predict (same of training set) there is only 1 and -1
> predictions.
>
> Accuracy is 0%.
>
>
>
>
>
> Can you help me please?
>
>
>
> *val* env = ExecutionEnvironment.getExecutionEnvironment
>
> *val* training = Seq(
>
>   *new* LabeledVector(1.0, *new* SparseVector(10, Array(0, 2, 3),
> Array(1.0, 1.0, 1.0))),
>
>   *new* LabeledVector(1.0, *new* SparseVector(10, Array(0, 1, 5, 9),
> Array(1.0, 1.0, 1.0, 1.0))),
>
>   *new* LabeledVector(0.0, *new* SparseVector(10, Array(0, 2), Array(
> 0.0, 1.0))),
>
>   *new* LabeledVector(0.0, *new* SparseVector(10, Array(0), Array(0.0
> ))),
>
>   *new* LabeledVector(2.0, *new* SparseVector(10, Array(0, 2), Array(
> 0.0, 1.0))),
>
>   *new* LabeledVector(2.0, *new* SparseVector(10, Array(0), Array(0.0
> ))),
>
>   *new* LabeledVector(1.0, *new* SparseVector(10, Array(0, 3), Array(
> 1.0, 1.0))),
>
>   *new* LabeledVector(0.0, *new* SparseVector(10, Array(0, 2, 3),
> Array(0.0, 1.0, 1.0))),
>
>   *new* LabeledVector(2.0, *new* SparseVector(10, Array(0, 7, 9),
> Array(0.0, 1.0))),
>
>   *new* LabeledVector(2.0, *new* SparseVector(10, Array(2,3,4), Array(
> 0.0,1.0,1.0))),
>
>   *new* LabeledVector(2.0, *new* SparseVector(10, Array(0, 3), Array(
> 1.0, 1.0))),
>
>   *new* LabeledVector(0.0, *new* SparseVector(10, Array(2, 3,9),
> Array(1.0, 0.0, 1.0)))
>
>
>
> );
>
> *val* trainingDS = env.fromCollection(training)
>
> *val* testingDS = env.fromCollection(training)
>
> *val* svm = *new* SVM().setBlocks(env.getParallelism)
>
> svm.fit(trainingDS)
>
> *val* predictions = *svm*.evaluate(testingDS.map(x => (x.vector, x.
> label)))
>
> predictions.print();
>
>
>
> Sample output:
>
>
>
> (1.0,1.0)
>
> (1.0,1.0)
>
> (0.0,1.0)
>
> (0.0,-1.0)
>
> (2.0,1.0)
>
> (2.0,-1.0)
>
> (1.0,1.0)
>
> (0.0,1.0)
>
> (2.0,1.0)
>
> (2.0,1.0)
>
> (2.0,1.0)
>
> (0.0,1.0)
>