Re: How to stop FlinkKafkaConsumer and make job finished?

2017-12-28 Thread Eron Wright
I believe you can extend the `KeyedDeserializationSchema` that you pass to
the consumer to check for end-of-stream markers.

https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.html#isEndOfStream-T-

Eron

On Wed, Dec 27, 2017 at 12:35 AM, Ufuk Celebi  wrote:

> Hey Jaxon,
>
> I don't think it's possible to control this via the life-cycle methods
> of your functions.
>
> Note that Flink currently does not support graceful stop in a
> meaningful manner and you can only cancel running jobs. What comes to
> my mind to cancel on EOF:
>
> 1) Extend Kafka consumer to stop emitting records after your EOF
> record. Look at the flink-connector-kafka-base module. This is
> probably not feasible and some work to get familiar with the code.
> Just putting in out there.
>
> 2) Throw a "SuccessException" that fails the job. Easy, but not nice.
>
> 3) Use an Http client and cancel your job via the Http endpoint
> (https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/monitoring/rest_api.html#job-cancellation).
> Easy, but not nice, since you need quite some logic in your function
> (e.g. ignore records after EOF record until cancellation, etc.).
>
> Maybe Aljoscha (cc'd) has an idea how to do this in a better way.
>
> – Ufuk
>
>
> On Mon, Dec 25, 2017 at 8:59 AM, Jaxon Hu  wrote:
> > I would like to stop FlinkKafkaConsumer consuming data from kafka
> manually.
> > But I find it won't be close when I invoke "cancel()" method. What I am
> > trying to do is add an EOF symbol meaning the end of my kafka data, and
> when
> > the FlatMap operator read the symbol it will invoke FlinkKafkaConsumer
> > "cancel()" method. It doesn't work. Flink streaming job won't finish
> unless
> > it get canceled or failed, when I use kafka as source.
> >
> > Somebody knowing  gives me some help, thx~~
>


Re: RichAsyncFunction in scala

2017-12-28 Thread Antoine Philippot
Hi Ufuk,

I don't think it is possible as I use this function as a parameter
of AsyncDataStream (from the scala API) which is mandatory to use with the
scala DataStream.



Le jeu. 28 déc. 2017 à 16:55, Ufuk Celebi  a écrit :

> Hey Antoine,
>
> isn't it possible to use the Java RichAsyncFunction from Scala like this:
>
> class Test extends RichAsyncFunction[Int, Int] {
>
>   override def open(parameters: Configuration): Unit =
> super.open(parameters)
>
>   override def asyncInvoke(input: Int, resultFuture:
> functions.async.ResultFuture[Int]): Unit = ???
> }
>
> – Ufuk
>
>
>
> On Thu, Dec 28, 2017 at 4:37 PM, Antoine Philippot
>  wrote:
> > Hi,
> >
> > It lacks a version of RichAsyncFunction class in the scala API or the
> > possibility to handle a class which extends AbstractRichFunction and
> > implements AsyncFunction (from the scala API).
> >
> > I made a small dev on our current flink fork because we need to use the
> open
> > method to add our custom metrics from getRuntimeContext.getMetricGroup
> > method.
> >
> https://github.com/aphilippot/flink/commit/acbd49cf2d64163040f2f954ce55917155b408ba
> >
> > Do you already plan to release this feature soon ? Do you want me to
> create
> > a new Jira ticket, propose a pull request ?
> >
> > Antoine
>


Re: RichAsyncFunction in scala

2017-12-28 Thread Ufuk Celebi
Hey Antoine,

isn't it possible to use the Java RichAsyncFunction from Scala like this:

class Test extends RichAsyncFunction[Int, Int] {

  override def open(parameters: Configuration): Unit = super.open(parameters)

  override def asyncInvoke(input: Int, resultFuture:
functions.async.ResultFuture[Int]): Unit = ???
}

– Ufuk



On Thu, Dec 28, 2017 at 4:37 PM, Antoine Philippot
 wrote:
> Hi,
>
> It lacks a version of RichAsyncFunction class in the scala API or the
> possibility to handle a class which extends AbstractRichFunction and
> implements AsyncFunction (from the scala API).
>
> I made a small dev on our current flink fork because we need to use the open
> method to add our custom metrics from getRuntimeContext.getMetricGroup
> method.
> https://github.com/aphilippot/flink/commit/acbd49cf2d64163040f2f954ce55917155b408ba
>
> Do you already plan to release this feature soon ? Do you want me to create
> a new Jira ticket, propose a pull request ?
>
> Antoine


RichAsyncFunction in scala

2017-12-28 Thread Antoine Philippot
Hi,

It lacks a version of RichAsyncFunction class in the scala API or the
possibility to handle a class which extends AbstractRichFunction and
implements AsyncFunction (from the scala API).

I made a small dev on our current flink fork because we need to use the
open method to add our custom metrics from getRuntimeContext.getMetricGroup
method.
https://github.com/aphilippot/flink/commit/acbd49cf2d64163040f2f954ce55917155b408ba

Do you already plan to release this feature soon ? Do you want me to create
a new Jira ticket, propose a pull request ?

Antoine


Re: org.apache.zookeeper.ClientCnxn, Client session timed out

2017-12-28 Thread Hao Sun
Ok, thanks for the clarification.

On Thu, Dec 28, 2017 at 1:05 AM Ufuk Celebi  wrote:

> On Thu, Dec 28, 2017 at 12:11 AM, Hao Sun  wrote:
> > Thanks! Great to know I do not have to worry duplicates inside Flink.
> >
> > One more question, why this happens? Because TM and JM both check
> leadership
> > in different interval?
>
> Yes, it's not deterministic how this happens. There will also be cases
> when the JM notices before the TM.
>
>


Re: How to apply patterns from a source onto another datastream?

2017-12-28 Thread Kostas Kloudas
Hi Jayant, 

As Dawid said, currently dynamically updating patterns is currently not 
supported.
There is also this question raised in the dev mailing list with 
the subject CEP: Dynamic Patterns.

I will repeat my answer here so that we are on the same page: 

"To support this, we need 2 features with one having to be added in Flink 
itself,
and the other to the CEP library.

The first one is broadcast state and the ability to connect keyed and non-keyed 
streams. This one is to be added to Flink itself and the good news are that 
this 
feature is scheduled to be added to Flink 1.5.

The second feature is to modify the CEP operator so that it can support 
multiple 
patterns and match incoming events against all of them. For this I have no 
clear 
deadline in my mind, but given that there are more and more people asking for 
it, I think it is going to be added soon."


Now for implementing CEP on top of Flink’s windowing mechanism, I would not
consider it as straight-forward as it sounds. Conceptually, CEP forms windows 
and within these windows you search for matching patterns. But CEP’s windowing 
semantics differ drastically from Flink’s windowing. Windows in CEP are created
whenever an event that matches the first element in a pattern comes. In Flink,
a window is created and is considered complete either based on time (tumbling/
sliding) or when a specific time interval expires without any activity in the 
stream (session).
So in one case (CEP), "window" boundaries are defined based on event properties 
while in the 
other (Flink Windowing), they are specified based on time. In addition, in CEP 
the order 
in which elements are consumed matters, as the pattern is essentially a state 
machine. 
In Flink, elements are added to a window in the order that they arrive.

In any case, I would consider an effort to re-implement CEP on top of Flink’s 
windowing
far from trivial. If your use case is simple and fits into Flink’s windowing 
semantics, then
go ahead. But if not, I would recommend waiting a bit more for the feature to 
be 
supported by the library.

Regards, 
Kostas


> On Dec 25, 2017, at 5:53 AM, Jayant Ameta  wrote:
> 
> Hi Dawid,
> Since dynamic patterns are not available in Flink CEP, I am thinking about 
> skipping the CEP altogether, and mimic the functionality using windows 
> stream. 
> I am mostly interested in times and within methods. Basically, rewriting my 
> own logic on windowed stream to match the pattern, and count the number of 
> matching events within a time window.
> Do you know if there is any similar example in the docs?
> 
> Jayant Ameta
> 
> On Fri, Dec 22, 2017 at 1:24 PM, Dawid Wysakowicz  > wrote:
> Hi Jayant,
> 
> Could you elaborate a bit more what you mean? Flink’s windows are not used in 
> Flink CEP. They are a different concept.
> 
> > On 20 Dec 2017, at 09:23, Jayant Ameta  > > wrote:
> >
> > Would it be possible to get the same result using windows?
> >
> > Jayant Ameta
> >
> > On Tue, Dec 19, 2017 at 3:23 PM, Dawid Wysakowicz 
> > mailto:wysakowicz.da...@gmail.com>> wrote:
> > It is not possible at this moment. FlinkCEP can handle only one Pattern 
> > applied statically. There is a JIRA ticket for that: 
> > https://issues.apache.org/jira/browse/FLINK-7129 
> >  .
> >
> > > On 19 Dec 2017, at 10:10, Jayant Ameta  > > > wrote:
> > >
> > > I've a datastream of events, and another datastream of patterns. The 
> > > patterns are provided by users at runtime, and they need to come via a 
> > > Kafka topic. I need to apply each of the pattern on the event stream 
> > > using Flink-CEP. Is there a way to get a PatternStream from the 
> > > DataStream when I don't know the pattern beforehand?
> > >
> > > https://stackoverflow.com/questions/47883408/apache-flink-how-to-apply-patterns-from-a-source-onto-another-datastream
> > >  
> > > 
> >
> >
> 
> 



Re: keyby() issue

2017-12-28 Thread Jinhua Luo
It's very strange, when I change the key selector to use random key,
the jvm reports oom.

   .keyBy(new KeySelector() {
 public Integer getKey(MyEvent ev) { return
ThreadLocalRandom.current().nextInt(1, 100);}
   })

Caused by: java.lang.OutOfMemoryError: Java heap space
at 
com.esotericsoftware.kryo.util.IdentityMap.resize(IdentityMap.java:469)
at com.esotericsoftware.kryo.util.IdentityMap.push(IdentityMap.java:230)
at com.esotericsoftware.kryo.util.IdentityMap.put(IdentityMap.java:144)
at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:818)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:157)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:175)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

Could anybody explain the internal of keyby()?

2017-12-28 17:33 GMT+08:00 Ufuk Celebi :
> Hey Jinhua,
>
> On Thu, Dec 28, 2017 at 9:57 AM, Jinhua Luo  wrote:
>> The keyby() upon the field would generate unique key as the field
>> value, so if the number of the uniqueness is huge, flink would have
>> trouble both on cpu and memory. Is it considered in the design of
>> flink?
>
> Yes, keyBy hash partitions the data across the nodes of your Flink
> application and thus you can easily scale your application up if you
> need more processing power.
>
> I'm not sure that this is the problem in your case though. Can you
> provide some more details what you are doing exactly? Are you
> aggregating by time (for the keyBy you mention no windowing, but then
> you mention windowAll)? What kind of aggregation are you doing? If
> possible, feel free to share some code.
>
>> Since windowsAll() could be set parallelism, so I try to use key
>> selector to use field hash but not value, that I hope it would
>> decrease the number of the keys, but the flink throws key out-of-range
>> exception. How to use key selector in correct way?
>
> Can you paste the exact Exception you use? I think this might indicate
> that you don't correctly extract the key from your record, e.g. you
> extract a different key on sender and receiver.
>
> I'm sure we can figure this out after you provide more context. :-)
>
> – Ufuk


Re: keyby() issue

2017-12-28 Thread Jinhua Luo
Does keyby() on field generate the same number of key as the number of
uniqueness of the field?
For example, if the field is valued in range {"a", "b", "c"}, then the
number of keys is 3, correct?
The field in my case has half of million uniqueness (ip addresses), so
keyby() on field following with timeWindow() would generate half of
million partitions?

If I use key selector instead, e.g.

  .keyBy(new KeySelector() {
 public Long getKey(MyEvent ev) { return ev.hashCode() % 137L; }
   })

Then the number of partitions could be limited within 137, correct?


2017-12-28 17:33 GMT+08:00 Ufuk Celebi :
> Hey Jinhua,
>
> On Thu, Dec 28, 2017 at 9:57 AM, Jinhua Luo  wrote:
>> The keyby() upon the field would generate unique key as the field
>> value, so if the number of the uniqueness is huge, flink would have
>> trouble both on cpu and memory. Is it considered in the design of
>> flink?
>
> Yes, keyBy hash partitions the data across the nodes of your Flink
> application and thus you can easily scale your application up if you
> need more processing power.
>
> I'm not sure that this is the problem in your case though. Can you
> provide some more details what you are doing exactly? Are you
> aggregating by time (for the keyBy you mention no windowing, but then
> you mention windowAll)? What kind of aggregation are you doing? If
> possible, feel free to share some code.
>
>> Since windowsAll() could be set parallelism, so I try to use key
>> selector to use field hash but not value, that I hope it would
>> decrease the number of the keys, but the flink throws key out-of-range
>> exception. How to use key selector in correct way?
>
> Can you paste the exact Exception you use? I think this might indicate
> that you don't correctly extract the key from your record, e.g. you
> extract a different key on sender and receiver.
>
> I'm sure we can figure this out after you provide more context. :-)
>
> – Ufuk


Re: Cassandra POJO sink flink 1.4.0 in scala

2017-12-28 Thread shashank agarwal
 Hi Micheal,

Thanks for the response actually I have solved the issue. I was sharing my
knowledge how I solved that. For sinking scala classes like JAVA Pojo. We
have to convert that to
JavaStream first but in 1.4 that already done by connector so no need to do
that in 1.4

We have to write scala class like this.

@SerialVersionUID(507L)
@Table(keyspace = "twtt", name = "order")
class OrderFinal(
   @BeanProperty var name: String,
   @BeanProperty var userEmail: String)extends
Serializable
{
  def this() {
this("NA", "NA")
  }
}

The variable name should be same as the column name in Cassandra table. If
we use case class in case of class than that will call
CassandraScalaProductSinkBuilder in the connector at that required query.
So we are making scala class as POJO class.
Scala class should be handled in connector separately. I will request the
feature for the same. While that this is workaround for scala developers.





‌

On Thu, Dec 28, 2017 at 11:28 AM, Michael Fong 
wrote:

> Hi, shashank agarwal
> 
>
>
> Not sure if I can answer fully your question, but after digging some code,
> I am not sure if C* connector totally supports Scala case class + CQL data
> mapping at the moment. I may be totally wrong, and you need to ask the
> flink dev about this. However, I have some toy examples that you could
> check out to see which uses CassandraScalaProductSinkBuilder + predefined
> CQL query + entity. I am not using Scala case class so may not fit your
> need.
>
> The example snippet you may find @
> https://github.com/mcfongtw/flink-cassandra-connector-examples/
>
> Regards,
>
> On Thu, Dec 28, 2017 at 1:11 PM, Michael Fong 
> wrote:
>
>> Hi, shashank agarwal
>>
>>
>> AFAIK, in java side, for a pojo data type, you don't need to set query
>> since the CQL data mapping would take care of that whereas dealing with
>> java tuples, you do need to provide a upsert query so that cassandra knows
>> what to insert into the table.
>> Scala tuple case is clear, same as java - providing a CQL query; however,
>> I don't know what's up with Scala pojo case (class) though...
>>
>> Regards,
>>
>> Michael
>>
>
>


-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things


Re: keyby() issue

2017-12-28 Thread Ufuk Celebi
Hey Jinhua,

On Thu, Dec 28, 2017 at 9:57 AM, Jinhua Luo  wrote:
> The keyby() upon the field would generate unique key as the field
> value, so if the number of the uniqueness is huge, flink would have
> trouble both on cpu and memory. Is it considered in the design of
> flink?

Yes, keyBy hash partitions the data across the nodes of your Flink
application and thus you can easily scale your application up if you
need more processing power.

I'm not sure that this is the problem in your case though. Can you
provide some more details what you are doing exactly? Are you
aggregating by time (for the keyBy you mention no windowing, but then
you mention windowAll)? What kind of aggregation are you doing? If
possible, feel free to share some code.

> Since windowsAll() could be set parallelism, so I try to use key
> selector to use field hash but not value, that I hope it would
> decrease the number of the keys, but the flink throws key out-of-range
> exception. How to use key selector in correct way?

Can you paste the exact Exception you use? I think this might indicate
that you don't correctly extract the key from your record, e.g. you
extract a different key on sender and receiver.

I'm sure we can figure this out after you provide more context. :-)

– Ufuk


Re: org.apache.zookeeper.ClientCnxn, Client session timed out

2017-12-28 Thread Ufuk Celebi
On Thu, Dec 28, 2017 at 12:11 AM, Hao Sun  wrote:
> Thanks! Great to know I do not have to worry duplicates inside Flink.
>
> One more question, why this happens? Because TM and JM both check leadership
> in different interval?

Yes, it's not deterministic how this happens. There will also be cases
when the JM notices before the TM.


keyby() issue

2017-12-28 Thread Jinhua Luo
Hi All,

I need to aggregate some field of the event, at first I use keyby(),
but I found the flink performs very slow (even stop working out
results) due to the number of keys is around half a million per min.
So I use windowAll() instead, and flink works as expected then.

The keyby() upon the field would generate unique key as the field
value, so if the number of the uniqueness is huge, flink would have
trouble both on cpu and memory. Is it considered in the design of
flink?

Since windowsAll() could be set parallelism, so I try to use key
selector to use field hash but not value, that I hope it would
decrease the number of the keys, but the flink throws key out-of-range
exception. How to use key selector in correct way?

In storm, we could achieve this goal at ease: use fieldGrouping to
connect the spout and bolt.