Re: Netty channel closed at AKKA gated status

2019-04-19 Thread Wenrui Meng
Looked at a few same instances. The lost task manager was indeed not active
anymore since there is no log for that task manager printed after the
connection issue timestamp. I guess somehow that task manager died silently
without exception or termination relevant information logged. I double
checked the lost task manager host, the GC, CPU, memory, network, disk I/O
all look good without any spike. Is there any other possibility that the
task manager can be terminated? We run our jobs in the yarn cluster.

On Mon, Apr 15, 2019 at 10:47 PM zhijiang 
wrote:

> Hi Wenrui,
>
> You might further check whether there exists network connection issue
> between job master and target task executor if you confirm the target task
> executor is still alive.
>
> Best,
> Zhijiang
>
> --
> From:Biao Liu 
> Send Time:2019年4月16日(星期二) 10:14
> To:Wenrui Meng 
> Cc:zhijiang ; user ;
> tzulitai 
> Subject:Re: Netty channel closed at AKKA gated status
>
> Hi Wenrui,
> If a task manager is killed (kill -9), it would have no chance to log
> anything. If the task manager exits since connection timeout, there would
> be something in log file. So it is probably killed by other user or
> operating system. Please check the log of operating system. BTW, I don't
> think "DEBUG log level" would help.
>
> Wenrui Meng  于2019年4月16日周二 上午9:16写道:
> There is no exception or any warning in the task manager
> `'athena592-phx2/10.80.118.166:44177'` log. In addition, the host was not
> shut down either in cluster monitor dashboard. It probably requires to turn
> on DEBUG log to get more useful information. If the task manager gets
> killed, I assume there will be terminating log in the task manager log. If
> not, I don't know how to figure out whether it's due to task manager gets
> killed or just a connection timeout.
>
>
>
> On Sun, Apr 14, 2019 at 7:22 PM zhijiang 
> wrote:
> Hi Wenrui,
>
> I think the akka gated issue and inactive netty channel are both caused by
> some task manager exits/killed. You should double check the status and
> reason of this task manager `'athena592-phx2/10.80.118.166:44177'`.
>
> Best,
> Zhijiang
> --
> From:Wenrui Meng 
> Send Time:2019年4月13日(星期六) 01:01
> To:user 
> Cc:tzulitai 
> Subject:Netty channel closed at AKKA gated status
>
> We encountered the netty channel inactive issue while the AKKA gated that
> task manager. I'm wondering whether the channel closed because of the AKKA
> gated status, since all message to the taskManager will be dropped at that
> moment, which might cause netty channel exception. If so, shall we have
> coordination between AKKA and Netty? The gated status is not intended to
> fail the system. Here is the stack trace fthe or exception
>
> 2019-04-12 12:46:38.413 [flink-akka.actor.default-dispatcher-90] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
> checkpoint 3758 (3788228399 bytes in 5967 ms).
> 2019-04-12 12:49:14.175 [flink-akka.actor.default-dispatcher-65] WARN
> akka.remote.ReliableDeliverySupervisor
> flink-akka.remote.default-remote-dispatcher-25 - Association with remote
> system [akka.tcp://flink@athena592-phx2:44487] has failed, address is now
> gated for [5000] ms. Reason: [Disassociated]
> 2019-04-12 12:49:14.175 [flink-akka.actor.default-dispatcher-65] WARN
> akka.remote.ReliableDeliverySupervisor
> flink-akka.remote.default-remote-dispatcher-25 - Association with remote
> system [akka.tcp://flink@athena592-phx2:44487] has failed, address is now
> gated for [5000] ms. Reason: [Disassociated]
> 2019-04-12 12:49:14.230 [flink-akka.actor.default-dispatcher-65] INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph  - id (14/96)
> (93fcbfc535a190e1edcfd913d5f304fe) switched from RUNNING to FAILED.
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Connection unexpectedly closed by remote task manager 'athena592-phx2/
> 10.80.118.166:44177'. This might indicate that the remote task manager
> was lost.
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:117)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
> at
> 

Re: Create Dynamic data type

2019-04-19 Thread Rong Rong
Hi Soheil,

If I understand correctly, when you said "according to the number of rows",
you were trying to dynamically determine the RowType based on how long one
row is, correct?
In this case, I am not sure this is considered supported in JDBCInputFormat
at this moment and it would be hard to support this.

Even if we extend the JDBCInputFormat to dynamically call
Connection.setSchema() every time you consume a row, this would still be
tricky because
1. In your "Select *", you won't be able to know how long the row is until
you actual executes the statement, but you have to setSchema before you
prepare statement.
2. You have to prepare statement every time schema changes.

You might be able to set all fields to just *GenericTypeInfo<>(Object.class)
*and convert it downstream. This will get around the dynamic schema, but
you still need to know the length of your select beforehand.

So, the best I can think of is to change your schema into maps or arrays of
Strings and Ints, or have your own SourceFunction to consume and
deserialize in your own way.

Thanks,
Rong



On Fri, Apr 19, 2019 at 8:19 AM Soheil Pourbafrani 
wrote:

> Hi,
>
> Using JDBCInputFormat I want to read data from database but the problem is
> the table columns are dynamic according to the number of rows. In the
> schema the first column is of type int and in the rest of the column the
> first half is String and the second half is double. So I need a way to
> create the data type dynamically.
>
> I tried the following:
>
> Tuple t = Tuple.getTupleClass(num_col + 1).newInstance();
> t.setField("Integer", 0);
> for(int i = 0; i < num_col; i++) {
> if (i < i / 2) t.setField("String", i);
> if (i > i / 2) t.setField("Double", i);
> }
>
>
> JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
> .setDrivername("com.mysql.jdbc.Driver")
> .setDBUrl("url")
> .setUsername("root")
> .setPassword("pass")
> .setQuery("SELECT * FROM table;")
> .setFetchSize(100)
> .setRowTypeInfo(new RowTypeInfo(TypeExtractor.getForObject(t)))
> .finish();
>
> but I got the following error:
>
> Automatic type extraction is not possible on candidates with null values.
> Please specify the types directly.
>
> Creating the data type using TypeInformation[] fieldTypes I successfully
> can get the data but it needs the static schema and doesn't fit in my case!
>
> Any help will be appreciated!
>


RichAsyncFunction Timer Service

2019-04-19 Thread Mikhail Pryakhin
Hello, Flink community!

It happens that I need to access a timer service in a RichAsyncFunction 
implementation. I know it's normally accomplished via StreamingRuntimeContext 
instance available in a RichFunction, but unfortunately, RichAsyncFunction 
extending RichFunction overrides “setRuntimeContext” method [1] wrapping a 
RuntimeContext instance passed as the method argument into a 
RichAsyncFunctionRuntimeContext instance [2]. This RichAsyncFunction specific 
RuntimeContext implementation is private [2] which makes it infeasible to gain 
access to a wrapped original RuntimeContext thus making it impossible to 
leverage timer service in RichAsyncFunction implementations. Just curious is 
there any reason for that? Can we make this implementation public or somehow 
share a wrapped instance?

Many thanks in advance!

[1] 
https://github.com/apache/flink/blob/c96a4d7afe379a291cc538ca36af896df8dc2127/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java#L76
[2] 
https://github.com/apache/flink/blob/c96a4d7afe379a291cc538ca36af896df8dc2127/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java#L100



Kind Regards,
Mike Pryakhin



smime.p7s
Description: S/MIME cryptographic signature


Create Dynamic data type

2019-04-19 Thread Soheil Pourbafrani
Hi,

Using JDBCInputFormat I want to read data from database but the problem is
the table columns are dynamic according to the number of rows. In the
schema the first column is of type int and in the rest of the column the
first half is String and the second half is double. So I need a way to
create the data type dynamically.

I tried the following:

Tuple t = Tuple.getTupleClass(num_col + 1).newInstance();
t.setField("Integer", 0);
for(int i = 0; i < num_col; i++) {
if (i < i / 2) t.setField("String", i);
if (i > i / 2) t.setField("Double", i);
}


JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("url")
.setUsername("root")
.setPassword("pass")
.setQuery("SELECT * FROM table;")
.setFetchSize(100)
.setRowTypeInfo(new RowTypeInfo(TypeExtractor.getForObject(t)))
.finish();

but I got the following error:

Automatic type extraction is not possible on candidates with null values.
Please specify the types directly.

Creating the data type using TypeInformation[] fieldTypes I successfully
can get the data but it needs the static schema and doesn't fit in my case!

Any help will be appreciated!


Re: assignTimestampsAndWatermarks not work after KeyedStream.process

2019-04-19 Thread an0
Hi,

First of all, thank you for the `shuffle()` tip. It works. However, I still 
don't understand why it doesn't work without calling `shuffle()`. 

Why would not all BoundedOutOfOrdernessTimestampExtractors receive trips? All 
the trips has keys and timestamps. As I said in my reply to Paul, I see the 
same watermarks being extracted.

How could calling `assignTimestampsAndWatermarks` before VS after `keyBy` 
matter? My understanding is any specific window for a specific key always 
receives the exactly same data, and the calling order of 
`assignTimestampsAndWatermarks` and `keyBy` shouldn't affect that.

To make `keyBy` as irrelevant as possible, I tried letting it always return the 
same key so that there is only 1 keyed stream and it is exactly the same as the 
original unkeyed stream. It still doesn't trigger windows:
```java
DataStream trips = env.addSource(consumer);
KeyedStream userTrips = trips.keyBy(trip -> 0L);
DataStream featurizedUserTrips =
userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor(Time.days(1)) {
@Override
public long extractTimestamp(Trip trip) {
return trip.endTime.getTime();
}
});
AllWindowedStream windowedUserTrips = 
featurizedUserTrips.timeWindowAll(Time.days(7),
Time.days(1));
```

It makes no sense to me. Please help me understand why it doesn't work. Thanks!

On 2019/04/19 04:14:31, Guowei Ma  wrote: 
> Hi,
> After keyby maybe only some of BoundedOutOfOrdernessTimestampExtractors
> could receive the elements(trip). If that is the case
> BoundedOutOfOrdernessTimestampExtractor, which does not receive element
> would not send the WM. Since that the timeWindowAll operator could not be
> triggered.
> You could add a shuffle() before the assignTimestampsAndWatermarks in your
> second case and check if the window is triggered.  If it could be triggered
> you could check the distribution of elements generated by the source.
> 
> Best,
> Guowei
> 
> 
> an0...@gmail.com  于2019年4月19日周五 上午4:10写道:
> 
> > I don't think it is the watermark. I see the same watermarks from the two
> > versions of code.
> >
> > The processing on the keyed stream doesn't change event time at all. I can
> > simply change my code to use `map` on the keyed stream to return back the
> > input data, so that the window operator receives the exactly same data. The
> > only difference is when I do `assignTimestampsAndWatermarks`. The result is
> > the same, `assignTimestampsAndWatermarks` before `keyBy` works:
> > ```java
> > DataStream trips =
> > env.addSource(consumer).assignTimestampsAndWatermarks(new
> > BoundedOutOfOrdernessTimestampExtractor(Time.days(1)) {
> > @Override
> > public long extractTimestamp(Trip trip) {
> > return trip.endTime.getTime();
> > }
> > });
> > KeyedStream userTrips = trips.keyBy(trip -> trip.userId);
> > DataStream featurizedUserTrips = userTrips.map(trip -> trip);
> > AllWindowedStream windowedUserTrips =
> > featurizedUserTrips.timeWindowAll(Time.days(7),
> > Time.days(1));
> > ```
> >
> > `assignTimestampsAndWatermarks` after `keyBy` doesn't work:
> > ```java
> > DataStream trips = env.addSource(consumer);
> > KeyedStream userTrips = trips.keyBy(trip -> trip.userId);
> > DataStream featurizedUserTrips =
> > userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new
> > BoundedOutOfOrdernessTimestampExtractor(Time.days(1)) {
> > @Override
> > public long extractTimestamp(Trip trip) {
> > return trip.endTime.getTime();
> > }
> > });
> > AllWindowedStream windowedUserTrips =
> > featurizedUserTrips.timeWindowAll(Time.days(7),
> > Time.days(1));
> > ```
> >
> > It feels a bug to me, but I want to confirm it before I file the bug
> > report.
> >
> > On 2019/04/18 03:38:34, Paul Lam  wrote:
> > > Hi,
> > >
> > > Could you check the watermark of the window operator? One possible
> > situation would be some of the keys are not getting enough inputs, so their
> > watermarks remain below the window end time and hold the window operator
> > watermark back. IMO, it’s a good practice to assign watermark earlier in
> > the data pipeline.
> > >
> > > Best,
> > > Paul Lam
> > >
> > > > 在 2019年4月17日,23:04,an0...@gmail.com 写道:
> > > >
> > > > `assignTimestampsAndWatermarks` before `keyBy` works:
> > > > ```java
> > > > DataStream trips =
> > > >env.addSource(consumer).assignTimestampsAndWatermarks(new
> > BoundedOutOfOrdernessTimestampExtractor(Time.days(1)) {
> > > >@Override
> > > >public long extractTimestamp(Trip trip) {
> > > >return trip.endTime.getTime();
> > > >}
> > > >});
> > > > KeyedStream userTrips = trips.keyBy(trip -> trip.userId);
> > > > DataStream featurizedUserTrips = userTrips.process(new
> > Featurization());
> > > > AllWindowedStream windowedUserTrips =
> > > >featurizedUserTrips.timeWindowAll(Time.days(7),
> > > >

Re: PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade

2019-04-19 Thread Oytun Tez
Forgot to answer one of your points: the parent class compiles well without
this CEP selector (with timeout signature)...

---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com


On Fri, Apr 19, 2019 at 9:40 AM Oytun Tez  wrote:

> Hey JingsongLee!
>
> Here are some findings...
>
>- flatSelect *without timeout* works normally:
>patternStream.flatSelect(PatternFlatSelectFunction), this compiles
>well.
>- Converted the both timeout and select selectors to an *inner class*
>(not static), yielded the same results, doesn't compile.
>- flatSelect *without* timeout, but with an inner class for
>PatternFlatSelectFunction, it compiles (same as first bullet).
>- Tried both of these selectors with *empty* body. Just a skeleton
>class. Doesn't compile either. Empty body example is in my first email.
>- Tried making both selectors *static public inner* classes, doesn't
>compile either.
>- Extracted both timeout and flat selectors to their own *independent
>classes* in separate files. Doesn't compile.
>- I am putting the *error stack* below.
>- Without the timeout selector in any class or lambda shape, with
>empty or full body, flatSelect compiles well.
>
> Would these findings help? Any ideas?
>
> Here is an error stack:
>
> 09:36:51,925 ERROR
> com.motaword.ipm.kernel.error.controller.ExceptionHandler -
> org.apache.flink.api.common.InvalidProgramException: The implementation of
> the PatternFlatSelectAdapter is not serializable. The object probably
> contains or references non serializable fields.
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1558)
> at
> org.apache.flink.cep.PatternStreamBuilder.clean(PatternStreamBuilder.java:86)
> at org.apache.flink.cep.PatternStream.process(PatternStream.java:114)
> at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:451)
> at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:408)
> at
> com.motaword.ipm.business.invitation.controller.PendingProjects.getPending(PendingProjects.java:89)
> at
> com.motaword.ipm.business.invitation.controller.PendingProjects.run(PendingProjects.java:45)
> at
> com.motaword.ipm.business.invitation.boundary.InvitationJob.run(InvitationJob.java:31)
> at com.motaword.ipm.kernel.Application.main(Application.java:63)
> Caused by: java.io.NotSerializableException:
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:576)
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
> ... 9 more
>
>
>
>
>
>
> ---
> Oytun Tez
>
> *M O T A W O R D*
> The World's Fastest Human Translation Platform.
> oy...@motaword.com — www.motaword.com
>
>
> On Fri, Apr 19, 2019 at 3:14 AM JingsongLee 
> wrote:
>
>> Hi @Oytun Tez
>> It Looks like your *PatternFlatSelectFunction* is not serializable.
>> Because you use anonymous inner class,
>> Check the class to which getPending belongs, maybe that class is not
>> serializable?
>>
>> Or you may be advised not to use internal classes, but to use a static 
>> internal class.
>>
>> Best, JingsongLee
>>
>> --
>> From:Oytun Tez 
>> Send Time:2019年4月19日(星期五) 03:38
>> To:user 
>> Subject:PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade
>>
>> Hi all,
>>
>> We are just migration from 1.6 to 1.8. I encountered a serialization
>> error which we didn't have before if memory serves: The implementation
>> of the *PatternFlatSelectAdapter* is not serializable. The object
>> probably contains or references non serializable fields.
>>
>> The method below 

Re: PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade

2019-04-19 Thread Oytun Tez
Hey JingsongLee!

Here are some findings...

   - flatSelect *without timeout* works normally:
   patternStream.flatSelect(PatternFlatSelectFunction), this compiles well.
   - Converted the both timeout and select selectors to an *inner class*
   (not static), yielded the same results, doesn't compile.
   - flatSelect *without* timeout, but with an inner class for
   PatternFlatSelectFunction, it compiles (same as first bullet).
   - Tried both of these selectors with *empty* body. Just a skeleton
   class. Doesn't compile either. Empty body example is in my first email.
   - Tried making both selectors *static public inner* classes, doesn't
   compile either.
   - Extracted both timeout and flat selectors to their own *independent
   classes* in separate files. Doesn't compile.
   - I am putting the *error stack* below.
   - Without the timeout selector in any class or lambda shape, with empty
   or full body, flatSelect compiles well.

Would these findings help? Any ideas?

Here is an error stack:

09:36:51,925 ERROR
com.motaword.ipm.kernel.error.controller.ExceptionHandler -
org.apache.flink.api.common.InvalidProgramException: The implementation of
the PatternFlatSelectAdapter is not serializable. The object probably
contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1558)
at
org.apache.flink.cep.PatternStreamBuilder.clean(PatternStreamBuilder.java:86)
at org.apache.flink.cep.PatternStream.process(PatternStream.java:114)
at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:451)
at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:408)
at
com.motaword.ipm.business.invitation.controller.PendingProjects.getPending(PendingProjects.java:89)
at
com.motaword.ipm.business.invitation.controller.PendingProjects.run(PendingProjects.java:45)
at
com.motaword.ipm.business.invitation.boundary.InvitationJob.run(InvitationJob.java:31)
at com.motaword.ipm.kernel.Application.main(Application.java:63)
Caused by: java.io.NotSerializableException:
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:576)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
... 9 more






---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com


On Fri, Apr 19, 2019 at 3:14 AM JingsongLee  wrote:

> Hi @Oytun Tez
> It Looks like your *PatternFlatSelectFunction* is not serializable.
> Because you use anonymous inner class,
> Check the class to which getPending belongs, maybe that class is not
> serializable?
>
> Or you may be advised not to use internal classes, but to use a static 
> internal class.
>
> Best, JingsongLee
>
> --
> From:Oytun Tez 
> Send Time:2019年4月19日(星期五) 03:38
> To:user 
> Subject:PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade
>
> Hi all,
>
> We are just migration from 1.6 to 1.8. I encountered a serialization error
> which we didn't have before if memory serves: The implementation of the
> *PatternFlatSelectAdapter* is not serializable. The object probably
> contains or references non serializable fields.
>
> The method below simply intakes a PatternStream from CEP.pattern() and
> makes use of the sideoutput for timed-out events. Can you see anything
> weird here (WorkerEvent is the input, but collectors collect Project
> object)?
>
> protected DataStream getPending(PatternStream
> patternStream) {
> OutputTag pendingProjectsTag = new *OutputTag*
> ("invitation-pending-projects"){};
>
> return patternStream.*flatSelect*(
> pendingProjectsTag,
>

TM occasionally hang in deploying state in Flink 1.5

2019-04-19 Thread qi luo
Hi all,

We use Flink 1.5 batch and start thousands of jobs per day. Occasionally we 
observed some stuck jobs, due to some TM hang in “DEPLOYING” state. 

On checking TM log, it shows that it stuck in downloading jars in BlobClient:


...
INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   - Received task 
DataSource (at createInput(ExecutionEnvironment.java:548) (our.code)) 
(184/2000).
INFO  org.apache.flink.runtime.taskmanager.Task - 
DataSource (at createInput(ExecutionEnvironment.java:548) (our.code)) 
(184/2000) switched from CREATED to DEPLOYING.
INFO  org.apache.flink.runtime.taskmanager.Task - Creating 
FileSystem stream leak safety net for task DataSource (at 
createInput(ExecutionEnvironment.java:548) (our.code)) (184/2000) [DEPLOYING]
INFO  org.apache.flink.runtime.taskmanager.Task - Loading 
JAR files for task DataSource (at createInput(ExecutionEnvironment.java:548) 
(our.code)) (184/2000) [DEPLOYING].
INFO  org.apache.flink.runtime.blob.BlobClient  - 
Downloading 
19e65c0caa41f264f9ffe4ca2a48a434/p-3ecd6341bf97d5512b14c93f6c9f51f682b6db26-37d5e69d156ee00a924c1ebff0c0d280
 from some-host-ip-port

no more logs...


It seems that the TM is calling BlobClient to download jars from JM/BlobServer. 
Under hood it’s calling Socket.connect() and then Socket.read() to retrieve 
results. 

Should we add timeout in socket operations in BlobClient to resolve this issue?

Thanks,
Qi

RE: kafka partitions, data locality

2019-04-19 Thread Smirnov Sergey Vladimirovich (39833)
Hi Ken,

It’s a bad story for us: even for a small window we have a dozens of thousands 
events per job with 10x in peaks or even more. And the number of jobs was known 
to be high. So instead of N operations (our producer/consumer mechanism) with 
shuffle/resorting (current flink realization) it will be N*ln(N) - the tenfold 
loss of execution speed!
4 all, my next step? Contribute to apache flink? Issues backlog?


With best regards,
Sergey
From: Ken Krugler [mailto:kkrugler_li...@transpac.com]
Sent: Wednesday, April 17, 2019 9:23 PM
To: Smirnov Sergey Vladimirovich (39833) 
Subject: Re: kafka partitions, data locality

Hi Sergey,

As you surmised, once you do a keyBy/max on the Kafka topic, to group by 
clientId and find the max, then the topology will have a partition/shuffle to 
it.

This is because Flink doesn’t know that client ids don’t span Kafka partitions.

I don’t know of any way to tell Flink that the data doesn’t need to be 
shuffled. There was a 
discussion
 about adding a keyByWithoutPartitioning a while back, but I don’t think that 
support was ever added.

A simple ProcessFunction with MapState (clientId -> max) should allow you do to 
the same thing without too much custom code. In order to support windowing, 
you’d use triggers to flush state/emit results.

— Ken


On Apr 17, 2019, at 2:33 AM, Smirnov Sergey Vladimirovich (39833) 
mailto:s.smirn...@tinkoff.ru>> wrote:

Hello,

We planning to use apache flink as a core component of our new streaming system 
for internal processes (finance, banking business) based on apache kafka.
So we starting some research with apache flink and one of the question, arises 
during that work, is how flink handle with data locality.
I`ll try to explain: suppose we have a kafka topic with some kind of events. 
And this events groups by topic partitions so that the handler (or a job 
worker), consuming message from a partition, have all necessary information for 
further processing.
As an example, say we have client’s payment transaction in a kafka topic. We 
grouping by clientId (transaction with the same clientId goes to one same kafka 
topic partition) and the task is to find max transaction per client in sliding 
windows. In terms of map\reduce there is no needs to shuffle data between all 
topic consumers, may be it`s worth to do within each consumer to gain some 
speedup due to increasing number of executors within each partition data.
And my question is how flink will work in this case. Do it shuffle all data, or 
it have some settings to avoid this extra unnecessary shuffle/sorting 
operations?
Thanks in advance!


With best regards,
Sergey Smirnov

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: [Discuss] Add JobListener (hook) in flink job lifecycle

2019-04-19 Thread Jeff Zhang
>>>  The ExecutionEnvironment is usually used by the user who writes the
code and this person (I assume) would not be really interested in these
callbacks.

Usually ExecutionEnvironment is used by the user who write the code, but it
doesn't needs to be created and configured by this person. e.g. in Zeppelin
notebook, ExecutionEnvironment is created by Zeppelin, user just use
ExecutionEnvironment to write flink program.  You are right that the end
user would not be interested in these callback, but the third party library
that integrate with zeppelin would be interested in these callbacks.

>>> In your case, it could be sufficient to offer some hooks for the
ClusterClient or being able to provide a custom ClusterClient.

Actually in my initial PR (https://github.com/apache/flink/pull/8190), I do
pass JobListener to ClusterClient and invoke it there.
But IMHO, ClusterClient is not supposed be a public api for users. Instead
JobClient is the public api that user should use to control job. So adding
hooks to ClusterClient directly and provide a custom ClusterClient doesn't
make sense to me. IIUC, you are suggesting the following approach
 env.getClusterClient().addJobListener(jobListener)
but I don't see its benefit compared to this.
 env.addJobListener(jobListener)

Overall, I think adding hooks is orthogonal with fine grained job control.
And I agree that we should refactor the flink client component, but I don't
think it would affect the JobListener interface. What do you think ?




Till Rohrmann  于2019年4月18日周四 下午8:57写道:

> Thanks for starting this discussion Jeff. I can see the need for
> additional hooks for third party integrations.
>
> The thing I'm wondering is whether we really need/want to expose a
> JobListener via the ExecutionEnvironment. The ExecutionEnvironment is
> usually used by the user who writes the code and this person (I assume)
> would not be really interested in these callbacks. If he would, then one
> should rather think about a better programmatic job control where the
> `ExecutionEnvironment#execute` call returns a `JobClient` instance.
> Moreover, we would effectively make this part of the public API and every
> implementation would need to offer it.
>
> In your case, it could be sufficient to offer some hooks for the
> ClusterClient or being able to provide a custom ClusterClient. The
> ClusterClient is the component responsible for the job submission and
> retrieval of the job result and, hence, would be able to signal when a job
> has been submitted or completed.
>
> Cheers,
> Till
>
> On Thu, Apr 18, 2019 at 8:57 AM vino yang  wrote:
>
>> Hi Jeff,
>>
>> I personally like this proposal. From the perspective of programmability,
>> the JobListener can make the third program more appreciable.
>>
>> The scene where I need the listener is the Flink cube engine for Apache
>> Kylin. In the case, the Flink job program is embedded into the Kylin's
>> executable context.
>>
>> If we could have this listener, it would be easier to integrate with
>> Kylin.
>>
>> Best,
>> Vino
>>
>> Jeff Zhang  于2019年4月18日周四 下午1:30写道:
>>
>>>
>>> Hi All,
>>>
>>> I created FLINK-12214
>>>  for adding
>>> JobListener (hook) in flink job lifecycle. Since this is a new public api
>>> for flink, so I'd like to discuss it more widely in community to get more
>>> feedback.
>>>
>>> The background and motivation is that I am integrating flink into apache
>>> zeppelin (which is a notebook in case you
>>> don't know). And I'd like to capture some job context (like jobId) in the
>>> lifecycle of flink job (submission, executed, cancelled) so that I can
>>> manipulate job in more fined grained control (e.g. I can capture the jobId
>>> when job is submitted, and then associate it with one paragraph, and when
>>> user click the cancel button, I can call the flink cancel api to cancel
>>> this job)
>>>
>>> I believe other projects which integrate flink would need similar
>>> mechanism. I plan to add api addJobListener in
>>> ExecutionEnvironment/StreamExecutionEnvironment so that user can add
>>> customized hook in flink job lifecycle.
>>>
>>> Here's draft interface JobListener.
>>>
>>> public interface JobListener {
>>>
>>> void onJobSubmitted(JobID jobId);
>>>
>>> void onJobExecuted(JobExecutionResult jobResult);
>>>
>>> void onJobCanceled(JobID jobId, String savepointPath);
>>> }
>>>
>>> Let me know your comment and concern, thanks.
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>

-- 
Best Regards

Jeff Zhang


job 失败告警

2019-04-19 Thread 戴嘉诚
大家好:
  请问,在代码中,如果感知job failed 后的方法调用(除了用restful 实时调用接口)?因为在on 
yarn中,如果job晚上failed了…上班的时候,就看不到对应的日志,也不知道他failed的原因了。我这里需要,当感知到job失败了,就调用代码外部通知。来实时知道job的情况。


Re: PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade

2019-04-19 Thread JingsongLee
Hi @Oytun Tez
It Looks like your PatternFlatSelectFunction is not serializable.
Because you use anonymous inner class, Check the class to which getPending 
belongs, maybe that class is not serializable?
Or you may be advised not to use internal classes, but to use a static internal 
class.

Best, JingsongLee


--
From:Oytun Tez 
Send Time:2019年4月19日(星期五) 03:38
To:user 
Subject:PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade

Hi all,

We are just migration from 1.6 to 1.8. I encountered a serialization error 
which we didn't have before if memory serves: The implementation of the 
PatternFlatSelectAdapter is not serializable. The object probably contains or 
references non serializable fields.

The method below simply intakes a PatternStream from CEP.pattern() and makes 
use of the sideoutput for timed-out events. Can you see anything weird here 
(WorkerEvent is the input, but collectors collect Project object)?

protected DataStream getPending(PatternStream 
patternStream) {
OutputTag pendingProjectsTag = new 
OutputTag("invitation-pending-projects"){};

return patternStream.flatSelect(
pendingProjectsTag,
new PatternFlatTimeoutFunction() {
@Override
public void timeout(Map> map, 
long l, Collector collector) {
}
},
new PatternFlatSelectFunction() {
@Override
public void flatSelect(Map> 
pattern, Collector collector) {
}
}
).name("Select pending projects for 
invitation").getSideOutput(pendingProjectsTag);
}

---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com 

metric does not display on web

2019-04-19 Thread sora
Hi all,
I am trying to monitor my flink application, so I add two metric in my 
application.
But I can not see any information on the web. The metric tab says "No metrics 
available".
 Do I miss any config?
My flink version: 1.7.2
My example code:
def main(args: Array[String]) {

  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

  val text = env.socketTextStream("HOST", )
  val counts = text.flatMap(new RichFlatMapFunction[String, String] {
private var counter: Counter = _
private var meter: org.apache.flink.metrics.Meter = _

override def open(parameters: Configuration): Unit = {
  super.open(parameters)
  counter = getRuntimeContext.getMetricGroup.counter("recordCounter")
  meter = getRuntimeContext.getMetricGroup.meter("recordMeter", new 
DropwizardMeterWrapper(new com.codahale.metrics.Meter()))
}

override def flatMap(value: String, out: Collector[String]): Unit = {
  val result = value.split("\\W+").filter(_.nonEmpty)
  result.foreach(out.collect)
  counter.inc(result.length)
  meter.markEvent(result.length)
}
  })
.map {
  (_, 1L)
}
.keyBy(_._1)
.timeWindow(Time.seconds(5))
.sum(1)
.map {
  _.toString()
}

  counts.addSink(new SocketClientSink[String]("HOST", , new 
SimpleStringSchema))

  env.execute("Scala SocketTextStreamWordCount Example")
}