Re: migrate AbstractStreamOperator from 1.0 to 1.4

2018-03-13 Thread Filippo Balicchia
Hi,
if some can be interested, restoreState was removed from  issue
https://issues.apache.org/jira/browse/FLINK-4196 and snapshotOperatorState
was replace from snapshotState in 1.2

--Filippo

2018-03-12 15:37 GMT+01:00 Filippo Balicchia :

> Hi,
>
> I'm newbie in Flink and in streaming Engine and I'm starting to get
> familiar with and understand with the Stream API using examples that
> creates Its own Operator
> The examples used AbstractStreamOperator at version 1.0 and updating
> library to 1.4 I notice that restoreState snapshotOperatorState
> StreamTaskState were removed. Could you point me please some link where I
> can found some  document about deprecation to understand how to replace
> that ?
>
> Thanks for help
>
> --Filippo
>


Re: Record Delivery Guarantee with Kafka 1.0.0

2018-03-13 Thread Chirag Dewan
 Hi,
Still stuck around this. 
My understanding is, this is something Flink can't handle. If the batch-size of 
Kafka Producer is non zero(which ideally should be), there will be in-memory 
records and data loss(boundary cases). Only way I can handle this with Flink is 
my checkpointing interval, which flushes any buffered records. 
Is my understanding correct here? Or am I still missing something?  
thanks,
Chirag  
On Monday, 12 March, 2018, 12:59:51 PM IST, Chirag Dewan 
 wrote:  
 
 Hi,
I am trying to use Kafka Sink 0.11 with ATLEAST_ONCE semantic and experiencing 
some data loss on Task Manager failure.
Its a simple job with parallelism=1 and a single Task Manager. After a few 
checkpoints(kafka flush's) i kill one of my Task Manager running as a container 
on Docker Swarm. 
I observe a small number of records, usually 4-5, being lost on Kafka broker(1 
broker cluster, 1 topic with 1 partition).
My FlinkKafkaProducer config are as follows : 
batch.size=default(16384)retries=3max.in.flight.requests.per.connection=1acks=1
As I understand it, all the messages batched by 
KafkaProducer(RecordAccumulator) in the memory-buffer, are lost. Is this why I 
cant see my records on the broker? Or is there something I am doing terribly 
wrong? Any help appreciated.
TIA,
Chirag
    

Re: Share state across operators

2018-03-13 Thread m@xi
Thank a lot Timo!

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


flink sql: "slow" performance on Over Window aggregation with time attribute set to event time

2018-03-13 Thread Yan Zhou [FDS Science]
Hi,

I am using flink sql in my application. It simply reads records from kafka 
source, converts to table, then runs an query to have over window aggregation 
for each record. Time lag watermark assigner with 10ms time lag is used.

The performance is not ideal. the end-to-end latency, which is the difference 
between the time an record arrives in flink source and the time the record 
arrives in flink sink, is around 250ms (median). Please note that my query, 
which is over window aggregation, will generate one result for each input 
record. I was expecting it to be less then 100ms. I increase the number of 
query to 100 times and still have same median end-to-end latency with plenty of 
CPU and memory available. It seems to me that something is holding my 
application back.

However, When I use process time as time attribute without changing anything 
else, the latency is reduced to 50ms. I understand that in general using 
process time should be faster. But for my test using event time, the time lag 
is set to only 10ms, which should mean the operators will almost immediately 
process the events after they arrives. And the classes which calculate over 
window aggregation(ProcTimeBoundedRangeOve, RowTimeBoundedRowsOver and etc...) 
basically have same logic. Why does using process_time or event_time could 
bring such big difference in end-to-end latency? And what is hold my 
application back if time attribute is set event time?

Below is my cluster and application setup and thank you for your time.


The cluster:

The cluster runs in standalone mode with 7 servers. Each server has 24 cores, 
240 GB memory. There are 1 job manager and 6 task managers. Each task manager 
is allocated with 12 cores, 120 GB memory and 6 taskmanager slots. Running hdfs 
over ssd on these servers as well.


The application:

When the event arrives flink from kafka, an ingestionTs is set for the event by 
the application. When the event arrives sink, the process latency is calculated 
as System.currentTimeMillis() - ingestionTs. The value is consider the 
end-to-end latency and recorded with histogram metric and can be view in flink 
web portal. RocksDB state backend is used. Time lag water assigner with time 
lag of 10ms is used.


Custom Source
-> Flat Map
-> Timestamps/Watermarks
-> (from: (id, ip, type, ingestionTs, eventTs) -> select: (id, ip, type, 
ingestionTs, eventTs))
--HASH-->
over:( PARTITION BY: ip,
ORDER BY: eventTs,
RANGEBETWEEN 8640 PRECEDING AND CURRENT ROW,
select: (id, ip, eventTs, COUNT(*) AS w0$o0), ingestionTs)
-> select: (id, eventTs, w0$o0 AS CNT), ingestionTs)
-> to: Tuple2
-> Sink: Unnamed

select id, eventTs, count(*) over (partition by id order by eventTs ranges 
between interval '24' hour preceding and current row) as cnt1 from myTable.




Re: State serialization problem when we add a new field in the object

2018-03-13 Thread Aljoscha Krettek
Hi,

I'm afraid Flink does currently not support changing the schema of state when 
restoring from a savepoint.

Best,
Aljoscha

> On 13. Mar 2018, at 07:36, kla  wrote:
> 
> Hi guys,
> 
> I have the flink streaming job running (1.2.0 version) which has the
> following state:
> 
> private transient ValueState>> userState;
> 
> With following configuration:
> 
> final ValueStateDescriptor>> descriptor =
>new ValueStateDescriptor<>("userState",
> TypeInformation.of(new UserTypeHint()));
>userState = getRuntimeContext().getState(descriptor);
> And the User class has following:
> 
> public class User {
> 
>private String id;
> 
>private String firstName;
> 
>private String lastName;
> 
> }
> 
> And after some time we tried to add one more field in the user object. (for
> example emailAddress). But apparently I didn't work, I am getting following
> exception:
> 
> 018-03-13 13:26:13,357 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job CountJob
> (cbada55d435571e8b24313196204f8ab) switched from state RUNNING to FAILING.
> com.esotericsoftware.kryo.KryoException:
> java.lang.IndexOutOfBoundsException: Index: 58, Size: 4
> Serialization trace:
> type (com.example.User)
>   at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>   at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>   at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
>   at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>   at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
>   at
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:82)
>   at
> com.example.TimestampUserCoFlatMapFunction.flatMap1(TimestampUserCoFlatMapFunction.java:33)
>   at
> com.example.TimestampUserCoFlatMapFunction.flatMap1(TimestampUserCoFlatMapFunction.java:27)
>   at
> org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement1(CoStreamFlatMap.java:53)
>   at
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:242)
>   at
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IndexOutOfBoundsException: Index: 58, Size: 4
>   at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>   at java.util.ArrayList.get(ArrayList.java:429)
>   at
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>   at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>   at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
>   at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
>   ... 15 more
> 
> 
> Thanks,
> Konstantin
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: PartitionNotFoundException when restarting from checkpoint

2018-03-13 Thread Fabian Hueske
Hi Seth,

Thanks for sharing how you resolved the problem!

The problem might have been related to Flink's key groups which are used to
assign key ranges to tasks.
Not sure why this would be related to ZooKeeper being in a bad state. Maybe
Stefan (in CC) has an idea about the cause.

Also, it would be helpful if you could share the stacktrace of the
exception (in case you still have it).

Best, Fabian

2018-03-13 14:35 GMT+01:00 Seth Wiesman :

> It turns out the issue was due to our zookeeper installation being in a
> bad state. I am not clear enough on flink’s networking internals to explain
> how this manifested as a partition not found exception, but hopefully this
> can serve as a starting point for other’s who run into the same issue.
>
>
>
> *Seth Wiesman*| Software Engineer 4 World Trade Center, 46th Floor, New
> York, NY 10007 swies...@mediamath.com 
>
>
>
>
>
> *From: *Seth Wiesman 
> *Date: *Friday, March 9, 2018 at 11:53 AM
> *To: *"user@flink.apache.org" 
> *Subject: *PartitionNotFoundException when restarting from checkpoint
>
>
>
> Hi,
>
>
>
> We are running Flink 1.4.0 with a yarn deployment on ec2 instances, rocks
> dB and incremental checkpointing, last night a job failed and became stuck
> in a restart cycle with a PartitionNotFound. We tried restarting the
> checkpoint on a fresh Flink session with no luck. Looking through the logs
> we can see that the specified partition is never registered with the
> ResultPartitionManager.
>
>
>
> My questions are:
>
> 1)  Are partitions a part of state or are the ephemeral to the job
>
> 2)  If they are not part of state, where would the task managers be
> getting that partition id to begin with
>
> 3)  Right now we are logging everything under
> org.apache.flink.runtime.io.network, is there anywhere else to look
>
>
>
> Thank you,
>
>
>
> *Seth Wiesman*| Software Engineer 4 World Trade Center, 46th Floor, New
> York, NY 10007 swies...@mediamath.com 
>
>
>


Re: Partial aggregation result sink

2018-03-13 Thread Fabian Hueske
Hi,

Chesnay is right.
SQL and Table API do not support early window results and no allowed
lateness to update results with late arriving data.
If you need such features, you should use the DataStream API.

Best, Fabian


2018-03-13 12:10 GMT+01:00 Chesnay Schepler :

> I don't think you can specify custom triggers when using purer SQL, but
> maybe Fabian or Timo know a SQL way of implementing your goal.
>
>
> On 12.03.2018 13:16, 李玥 wrote:
>
> Hi Chirag,
> Thank for your reply!
> I found a provided ContinuousEventTimeTrigger should be worked in my
> situation.
> Most examples are based on Table API like 
> ‘ds.keyBy(0).window().trigger(MyTrigger.of())…’,
> But how to apply the trigger to a pure Flink SQL Application ?
>
>
>
>
>
>
> 在 2018年3月12日,下午5:20,Chirag Dewan  写道:
>
> Hi LiYue,
>
> This should help : Apache Flink 1.5-SNAPSHOT Documentation: Windows
> 
>
>
> Apache Flink 1.5-SNAPSHOT Documentation: Windows
> 
>
>
>
> So basically you need to register a processing time trigger at every 10
> minutes and on callback, you can FIRE the window result like this:
>
>   @Override
> public TriggerResult onProcessingTime(long time, TimeWindow window,
> TriggerContext ctx) throws Exception {
>   // schedule next timer
>   ctx.registerProcessingTimeTimer(System.currentTimeMillis() + 1000L);
>   return TriggerResult.FIRE;
> }
>
>
> I hope it helps.
>
> Chirag
>
> On Monday, 12 March, 2018, 2:10:25 PM IST, 李玥 
> wrote:
>
>
> Hi,team
> I’m working on a event-time based aggregation application with flink
> SQL.  Is there any way to keep sinking partial aggregation result BEFORE
> time window closed?
> For example, My SQL:
> select …
> from my_table
> GROUP BY TUMBLE(`timestamp`, INTERVAL '1’ DAY),other_column;
> Usually, Flink sink agg result after time-window closed, Is there any way
> to keep sinking TODAY’s partial aggregation result every 10 miniutes so we
> can see today’s performance on my chart.
>
> Thanks!
> LiYue
>
>
>
>


sorting data into sink

2018-03-13 Thread Telco Phone
Does any know if this is a correct assumption

DataStream sorted = stream.keyBy("partition");

Will automattically put same record to the same sink thread ?

The behavior I am seeing is that a Sink setup with multiple threads is see data 
from the same hour.
Any good examples of how to sort data so that Sink threads only get the same 
type of data ?
Thanks



Re: Dependency Injection and Flink

2018-03-13 Thread Steven Wu
Xiaochuan,

We are doing exactly as you described. We keep the injector as a global
static var.

But we extend from FlinkJobManager and FlinkTaskManager to override main
method and initialize the injector (and other things) during JVM startup,
which does cause tight code coupling. It is a little painful to upgrade
Flink because sometimes internal code structure change of FlinkJobManager
and FlinkTaskManager can break our extended class..

Thanks,
Steven


On Tue, Mar 13, 2018 at 11:30 AM, XiaoChuan Yu  wrote:

> Hi,
>
> I'm evaluating Flink with the intent to integrate it into a Java project
> that uses a lot of dependency injection via Guice. What would be the best
> way to work with DI/Guice given that injected fields aren't Serializable?
> I looked at this StackOverflow answer so far. To my understanding the
> strategy is as follows but I'm not sure about step 3:
>
>1. Use a RichFunction any time injection required.
>2. Do not use @Inject, instead mark each injected field as transient.
>3. Implement open() / close() and manually assign values to injected
>fields using Injector.getInstance(SomeClass.class)? But where do I get
>the injector? Create one on the spot each time? Keep one as a static var
>somewhere and use everywhere?
>
> Example:
>  public class MyFilter extends FilterFunction {
>  private transient DbClient dbClient;
>  //@Inject DbClient dbClient; //typical Guice field injection
>
>  public void open(Configuration parameters) {
>  // where am I suppose to get the injector?
>  // keep it as a static variable somewhere and init it in Main?
>  this.dbClient = MyInjectorHolder.injector().
> getInstance(DbClient.class);
>  }
>  public boolean filter(String value) {
>  return this.dbClient.query(value);
>  }
>  }
> I haven't setup a Flink environment to try the above yet though.
> Does anyone know of a less verbose way?
> I imagine this could get quite verbose with multiple injected fields.
>
> Thanks,
> Xiaochuan Yu
>
>


Dependency Injection and Flink

2018-03-13 Thread XiaoChuan Yu
Hi,

I'm evaluating Flink with the intent to integrate it into a Java project
that uses a lot of dependency injection via Guice. What would be the best
way to work with DI/Guice given that injected fields aren't Serializable?
I looked at this StackOverflow answer so far. To my understanding the
strategy is as follows but I'm not sure about step 3:

   1. Use a RichFunction any time injection required.
   2. Do not use @Inject, instead mark each injected field as transient.
   3. Implement open() / close() and manually assign values to injected
   fields using Injector.getInstance(SomeClass.class)? But where do I get the
   injector? Create one on the spot each time? Keep one as a static var
   somewhere and use everywhere?

Example:
 public class MyFilter extends FilterFunction {
 private transient DbClient dbClient;
 //@Inject DbClient dbClient; //typical Guice field injection

 public void open(Configuration parameters) {
 // where am I suppose to get the injector?
 // keep it as a static variable somewhere and init it in Main?
 this.dbClient =
MyInjectorHolder.injector().getInstance(DbClient.class);
 }
 public boolean filter(String value) {
 return this.dbClient.query(value);
 }
 }
I haven't setup a Flink environment to try the above yet though.
Does anyone know of a less verbose way?
I imagine this could get quite verbose with multiple injected fields.

Thanks,
Xiaochuan Yu


Re: Extremely large job serialization produced by union operator

2018-03-13 Thread Fabian Hueske
Hi Bill,

The size of the program depends on the number and complexity SQL queries
that you are submitting.
Each query might be translated into a sequence of multiple operators. Each
operator has a string with generated code that will be compiled on the
worker nodes. The size of the code depends on the number of fields in the
schema.
Operators and code are not shared across queries.

Best, Fabian

2018-03-09 23:36 GMT+01:00 杨力 :

> Thank you for your response. It occurs both in a standalone cluster anda a
> yarn-cluster. I am trying to remove business code and reproduce it with a
> minimal demo.
>
>
> On Sat, Mar 10, 2018 at 2:27 AM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> Could you provide more details about your queries and setup? Logs could
>> be helpful as well.
>>
>> Piotrek
>>
>> > On 9 Mar 2018, at 11:00, 杨力  wrote:
>> >
>> > I wrote a flink-sql app with following topography.
>> >
>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map ->
>> JDBCAppendTableSink
>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map ->
>> JDBCAppendTableSink
>> > ...
>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map ->
>> JDBCAppendTableSink
>> >
>> > I have a dozen of TableSources And tens of SQLs. As a result, the
>> number of JDBCAppendTableSink times parallelism, that is the number of
>> concurrent connections to database, is too large for the database server to
>> handle. So I tried union DataStreams before connecting them to the
>> TableSink.
>> >
>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map
>> > \
>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map --- union ->
>> JDBCAppendTableSink
>> > ... /
>> > KafkaJsonTableSource -> SQL -> toAppendStream -> Map
>> >
>> > With this strategy, job submission failed with an
>> OversizedPayloadException of 104 MB. Increasing akka.framesize helps to
>> avoid this exception, but job submission hangs and times out.
>> >
>> > I can't understand why a simple union operator would serialize to such
>> a large message. Can I avoid this problem?
>> > Or can I change some configuration to fix the submission time out?
>> >
>> > Regards,
>> > Bill
>>
>>


Re: Event time join

2018-03-13 Thread Fabian Hueske
Hi,

A Flink application does not have a problem if it ingests two streams with
very different throughput as long as they are somewhat synced on their
event-time.
This is typically the case when ingesting real-time data. In such
scenarios, an application would not buffer more data than necessary.

When reading two streams of historic data with different "density" (events
per time interval) or real-time streams that are off by some time interval,
the application needs to buffer more data to compensate for the difference
in time.
In case of real-time streams that are off by a (more or less) fixed offset,
you should plan for the additional state requirements. Syncing sources to
the same event-time would help in both cases.
However, Flink's RocksDB state backend is also pretty good in handling very
large state sizes due to asynchronous and incremental checkpointing.

The window join functions of the SQL and Table API are implemented using a
CoProcessFunction and so is the new join operator that I pointed to.

Syncing sources is not really related to fault tolerance except that
additional state affects the checkpointing and recovery performance.
Pausing sources can cause problems because watermarks do not advance when
no data is ingested, but again this is not related to fault tolerance.

The credit-based network transfer will be included in Flink 1.5. However,
this is not related to the question discussed here.
It only applies to cases where an operator cannot continue processing, for
example if the function call does not return.
An operator cannot decide to block a particular input and process the other
one.

Long story short.
If you join two streams on event time, you need to buffer the data for the
join window + the event time difference between both streams.

Best, Fabian


2018-03-09 9:28 GMT+01:00 Gytis Žilinskas :

> Thanks for the answers and discussion both of you.
>
> The FLIP mentions that the cases where one stream is much faster than
> the other one, will not be handled for now either, so I guess it would
> still not solve our problems. As for the join semantics itself, I
> think we achieve the same thing with CoProcessFunction, unless I'm
> missing something.
>
> Anyway, one couple more questions then. It seems weird that this issue
> isn't much more talked about or prioritized. That leads me to believe
> that maybe we're misunderstanding the use case for flink, or maybe
> other users have a different architecture / environment that doesn't
> present them with such problems. Could you describe how it is usually
> used?
>
> From the documentation and talks it looks like fault tolerance is an
> important concept in flink, so a source pausing, or slowing down is
> expected. The way I see it, the only options to deal with it at the
> moment:
>
> 1) have a cluster size that can buffer everything for as long as
> needed and is able to eventually catch up
> 2) model the behaviour so that the streams that are ahead, can go
> through after some cutoff time
>
> do most of the applications just fall into one of these behaviours?
>
> Finally, are there some ideas about extending capabilities of the
> backpressure mechanism that would allow of building some sort of
> functionality, similar to what I was describing in the initial mail.
> With some prioritisation to one of the streams, or other custom
> stalling behaviour. (maybe this credit based approach Vishal mentions?
> The FLIP document is not public, so can't really tell)
>
>
> Thanks again for all the help!
> Gytis
>
> On Thu, Mar 8, 2018 at 7:48 PM, Vishal Santoshi
>  wrote:
> > Yep.  I think this leads to this general question and may be not
> pertinent
> > to https://github.com/apache/flink/pull/5342.  How do we throttle a
> source
> > if the held back data gets unreasonably large ? I know that that is in
> > itself a broader question but delayed watermarks of slow stream
> accentuates
> > the issue . I am curious to know how credit based back pressure handling
> > plays or is that outside the realm of this discussion ? And is credit
> based
> > back pressure handling in 1.5 release.
> >
> > On Thu, Mar 8, 2018 at 12:23 PM, Fabian Hueske 
> wrote:
> >>
> >> The join would not cause backpressure but rather put all events that
> >> cannot be processed yet into state to process them later.
> >> So this works well if the data that is provided by the streams is
> roughly
> >> aligned by event time.
> >>
> >> 2018-03-08 9:04 GMT-08:00 Vishal Santoshi :
> >>>
> >>> Aah we have it here
> >>> https://docs.google.com/document/d/16GMH5VM6JJiWj_
> N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.bgl260hr56g6
> >>>
> >>> On Thu, Mar 8, 2018 at 11:45 AM, Vishal Santoshi
> >>>  wrote:
> 
>  This is very interesting.  I would imagine that there will be high
> back
>  pressure on the LEFT source effectively throttling it but as is the
> current
>  state that is likely effect other pipelines as the free o/p buffer on
> the
>  source side and and i/p bu

State serialization problem when we add a new field in the object

2018-03-13 Thread kla
Hi guys,

I have the flink streaming job running (1.2.0 version) which has the
following state:

private transient ValueState>> userState;

With following configuration:

final ValueStateDescriptor>> descriptor =
new ValueStateDescriptor<>("userState",
TypeInformation.of(new UserTypeHint()));
userState = getRuntimeContext().getState(descriptor);
And the User class has following:

public class User {

private String id;

private String firstName;

private String lastName;

}

And after some time we tried to add one more field in the user object. (for
example emailAddress). But apparently I didn't work, I am getting following
exception:

018-03-13 13:26:13,357 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job CountJob
(cbada55d435571e8b24313196204f8ab) switched from state RUNNING to FAILING.
com.esotericsoftware.kryo.KryoException:
java.lang.IndexOutOfBoundsException: Index: 58, Size: 4
Serialization trace:
type (com.example.User)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:82)
at
com.example.TimestampUserCoFlatMapFunction.flatMap1(TimestampUserCoFlatMapFunction.java:33)
at
com.example.TimestampUserCoFlatMapFunction.flatMap1(TimestampUserCoFlatMapFunction.java:27)
at
org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement1(CoStreamFlatMap.java:53)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:242)
at
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IndexOutOfBoundsException: Index: 58, Size: 4
at java.util.ArrayList.rangeCheck(ArrayList.java:653)
at java.util.ArrayList.get(ArrayList.java:429)
at
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
... 15 more


Thanks,
Konstantin



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: UUIDs generated by Flink SQL

2018-03-13 Thread Fabian Hueske
Hi Gregory,

Your understanding is correct. It is not possible to assign UUID to the
operators generated by the SQL/Table planner.
To be honest, I am not sure whether the use case that you are describing
should be the scope of the "officially" supported use cases of the API.
It would require in depth knowledge of the SQL operators' internals which
is something that we don't want to expose as public API because we want to
have the freedom to improve the execution code.

Having said that, we have thought about adding the possibility of adjusting
the parallelism of operators.
Similar to assigning UUIDs, this would require an intermediate step between
planning and submission because usually, you don't know the plan that is
generated.
This could be done by generating a representation of a plan that can be
modified before translating it into a DataStream program.

Right now, we don't aim to guarantee backwards compatibility for queries.
Starting a query from a savepoint works if you don't change the query and
flink-table version but might fail as soon as you change either of both.
If you start the same query with a different flink-table version, different
optimization rules or changes in the operators might result in different
states.
If you start a different query, the data types of the state of operators
will most likely have changed.
Coming up with an upgrade strategy for SQL queries is still a major TODO
and there are several ideas how this can be achieved.

Best, Fabian


2018-03-09 0:47 GMT+01:00 Gregory Fee :

> Hello, from what I understand in the documentation it appears there is no
> way to assign UUIDs to operators added to the DAG by Flink SQL. Is my
> understanding correct?
>
> I'd very much like to be able to assign UUIDs to those operators. I want
> to run a program using some Flink SQL, create a save point, and then run
> another program with slightly different structure that picks up from that
> save point. The suggested way of making something like that work in the
> document is to assign UUIDs but that doesn't seem possible if I'm using
> Flink SQL. Any advice?
>
> On a related note, I'm wondering what happens if I have a stateful program
> using Flink SQL and I want to update my Flink binaries. If the query plan
> ends up changing based on that upgrade does it mean that the load of the
> save point is going to fail?
>
> Thanks!
>
> --
> *Gregory Fee*
> Engineer
> 425.830.4734 <+14258304734>
> [image: Lyft] 
>


Too many open files on Bucketing sink

2018-03-13 Thread galantaa
Hey all,
I'm using bucketing sink with a bucketer that creates partition per customer
per day.
I sink the files to s3.
it suppose to work on around 500 files at the same time (according to my
partitioning).

I have a critical problem of 'Too many open files'.
I've upload two taskmanagers, each with 16 slots. I've checked how many open
files (or file descriptors) exist with 'lsof | wc -l' and it had reached
over a million files on each taskmanager!

after that, I'd decreased the num of taskSlots to 8 (4 in each taskmanager),
and the concurrency dropped.
checking 'lsof | wc -l' gave around 250k file on each machine. 
I also checked how many actual files exist in my tmp dir (it works on the
files there before uploading them to s3) - around 3000.

I think that each taskSlot works with several threads (maybe 16?), and each
thread holds a fd for the actual file, and thats how the numbers get so
high.

Is that a know problem? is there anything I can do?
by now, I filter just 10 customers and it works great, but I have to find a
real solution so I can stream all the data.
Maybe I can also work with a single task Slot per machine but I'm not sure
this is a good idea.

Thank you very much,
Alon 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: PartitionNotFoundException when restarting from checkpoint

2018-03-13 Thread Seth Wiesman
It turns out the issue was due to our zookeeper installation being in a bad 
state. I am not clear enough on flink’s networking internals to explain how 
this manifested as a partition not found exception, but hopefully this can 
serve as a starting point for other’s who run into the same issue.

[cid:image001.png@01D3BAAE.915F15C0]

Seth Wiesman| Software Engineer
4 World Trade Center, 46th Floor, New York, NY 
10007
swies...@mediamath.com



From: Seth Wiesman 
Date: Friday, March 9, 2018 at 11:53 AM
To: "user@flink.apache.org" 
Subject: PartitionNotFoundException when restarting from checkpoint

Hi,

We are running Flink 1.4.0 with a yarn deployment on ec2 instances, rocks dB 
and incremental checkpointing, last night a job failed and became stuck in a 
restart cycle with a PartitionNotFound. We tried restarting the checkpoint on a 
fresh Flink session with no luck. Looking through the logs we can see that the 
specified partition is never registered with the ResultPartitionManager.

My questions are:

1)  Are partitions a part of state or are the ephemeral to the job

2)  If they are not part of state, where would the task managers be getting 
that partition id to begin with

3)  Right now we are logging everything under 
org.apache.flink.runtime.io.network, is there anywhere else to look

Thank you,

[cid:image002.png@01D3BAAE.915F15C0]

Seth Wiesman| Software Engineer
4 World Trade Center, 46th Floor, New York, NY 
10007
swies...@mediamath.com




Re: Global Window, Trigger and Watermarks, Parallelism

2018-03-13 Thread dim5b
I have looked into the CEP library. I have posted  an issued on
stackoverflow.

https://stackoverflow.com/questions/49047879/global-windows-in-flink-using-custom-triggers-vs-flink-cep-pattern-api

However the pattern matches all possible solution on the stream of
events.Does pattern have a notion of Evictor? How can you keep only the
specific set of events.

This question has not been answered and there seems to be a similar question 

https://stackoverflow.com/questions/48028061/flink-cep-greedy-matching

Thanks



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: HDFS data locality and distribution

2018-03-13 Thread Chesnay Schepler

Hello,

You said that "data is distributed very badly across slots"; do you mean 
that only a small number of subtasks is reading from HDFS, or that the 
keyed data is only processed by a few subtasks?


Flink does prioritize date locality over date distribution when reading 
the files, but the function after the groupBy() should still make full 
use of the parallelism of the cluster. Do note that data skew can affect 
how much data is distributed to each node, i.e. if 80% of your data has 
the same key (or rather hash), they will all end up on the same node.


On 12.03.2018 13:49, Reinier Kip wrote:


Relevant versions: Beam 2.1, Flink 1.3.


*From:* Reinier Kip 
*Sent:* 12 March 2018 13:45:47
*To:* user@flink.apache.org
*Subject:* HDFS data locality and distribution

Hey all,


I'm trying to batch-process 30-ish files from HDFS, but I see that 
data is distributed very badly across slots. 4 out of 32 slots get 
4/5ths of the data, another 3 slots get about 1/5th and a last slot 
just a few records. This probably triggers disk spillover on these 
slots and slows down the job immensely. The data has many many unique 
keys and processing could be done in a highly parallel manner. From 
what I understand, HDFS data locality governs which splits are 
assigned to which subtask.



  * I'm running a Beam on Flink on YARN pipeline.
  * I'm reading 30-ish files, whose records are later grouped by
their millions of unique keys.
  * For now, I have 8 task managers by 4 slots. Beam sets all subtasks
to have 32 parallelism.
  * Data seems to be localised to 9 out of the 32 slots, 3 out of the
8 task managers.


Does the statement of input split assignment ring true? Is the fact 
that data isn't redistributed an effort from Flink to have high data 
locality, even if this means disk spillover for a few slots/tms and 
idleness for others? Is there any use for parallelism if work isn't 
distributed anyway?



Thanks for your time, Reinier





Re: Partial aggregation result sink

2018-03-13 Thread Chesnay Schepler
I don't think you can specify custom triggers when using purer SQL, but 
maybe Fabian or Timo know a SQL way of implementing your goal.


On 12.03.2018 13:16, 李玥 wrote:

Hi Chirag,
Thank for your reply!
I found a provided ContinuousEventTimeTrigger should be worked in my 
situation.
Most examples are based on Table API like 
‘ds.keyBy(0).window().trigger(MyTrigger.of())…’, But how to apply the 
trigger to a pure Flink SQL Application ?







在 2018年3月12日,下午5:20,Chirag Dewan > 写道:


Hi LiYue,

This should help : Apache Flink 1.5-SNAPSHOT Documentation: Windows 







Apache Flink 1.5-SNAPSHOT Documentation: Windows





So basically you need to register a processing time trigger at every 
10 minutes and on callback, you can FIRE the window result like this:


  @Override
public TriggerResult onProcessingTime(long time, TimeWindow 
window, TriggerContext ctx) throws Exception {

  // schedule next timer
ctx.registerProcessingTimeTimer(System.currentTimeMillis() + 1000L);
  return TriggerResult.FIRE;
}


I hope it helps.

Chirag

On Monday, 12 March, 2018, 2:10:25 PM IST, 李玥 > wrote:



Hi,team
I’m working on a event-time based aggregation application with 
flink SQL.  Is there any way to keep sinking partial aggregation 
result BEFORE time window closed?

For example, My SQL:
select …
from my_table
GROUP BY TUMBLE(`timestamp`, INTERVAL '1’ DAY),other_column;
Usually, Flink sink agg result after time-window closed, Is there any 
way to keep sinking TODAY’s partial aggregation result every 10 
miniutes so we can see today’s performance on my chart.


Thanks!
LiYue







Re: Global Window, Trigger and Watermarks, Parallelism

2018-03-13 Thread Chesnay Schepler

Hello,

Event-time and watermarks can be used to deal with out-of-order events, 
but since you're using global windows (opposed to time-based windows) 
you have to implement the logic for doing this yourself.


Conceptually, what you would have to do is to not create your TripEv 
when receiving a STOP event, but when a given amount of time has passed 
after you received it. Your CreateTrip function would have to scan the 
window contents for START -> STOP sequences and check the timestamp of 
STOP with the current event time. (I think for this you would have to 
extend ProcessWindowFunction instead)
The evictor would have to use the same logic to detect sequences that 
were already processed.


I suggest to look into our CEP 
 
library as this looks like a perfect use-case for it.


On 12.03.2018 14:37, dim5b wrote:

Could someone clarify how exactly event time/watermarks and allow lateness
work. I have created the program below and I have an input file such as...

   device_id,trigger_id,event_time,messageId
 1,START,1520433909396,1
 1,TRACKING,1520433914398,2
 1,TRACKING,1520433919398,3
 1,STOP,1520433924398,4
 1,START,1520433929398,5
 1,TRACKING,1520433934399,6
 1,TRACKING,1520433939399,7
 1,TRACKING,1520433944399,8
 1,STOP,1520433949399,9

Where trigger_id can be an indicator such as: start,tracking,stop. What I
would like to do is based on device_id group all incoming events and define
a window based on the trigger_id. I.e group all events from start until stop
and then do some calculations such as: average,max etc.

I call  env.readTextFile("events.csv"); and set StreamTimeCharacteristic to
EventTime. Parellism is set to 4. This means my messages from the source
file are read but are not in order... (I have added messageId as an counter
only for dev purposes)

I have defined a custom trigger to find stop events and fire in order to
evict all collected events. My main problem is that if parellism is
increased from 1 the input source reads these out of order.

Shouldn't event time and watermarks resolve this issue? How do i handle
possible out of order events?

public class SimpleEventJob {

public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(4);
 //env.setParallelism(1);
DataStream input = env.readTextFile("events.csv");
// create event stream
DataStream events = input.map(new LineToEvent());
DataStream waterMarkedStreams =
events.assignTimestampsAndWatermarks(new EventWAssigner());
DataStream tripStream = 
waterMarkedStreams.keyBy("deviceId")
.window(GlobalWindows.create())
.trigger(new TripTrigger())
.evictor(new TripEvictor())
.apply(new CreateTrip());
tripStream.print();
 // execute program
env.execute("Flink Streaming Java API Skeleton");
}

 public static class TripTrigger extends Trigger {
 @Override
 public TriggerResult onElement(Event event, long timestamp,
GlobalWindow window, TriggerContext context) throws Exception {
 // if this is a stop event, set a timer
 if (event.getTrigger() == 53) {
return TriggerResult.FIRE;
 }
 return TriggerResult.CONTINUE;
 }

@Override
public TriggerResult onEventTime(long time, GlobalWindow window,
TriggerContext ctx) {
 return TriggerResult.FIRE;
}

@Override
public TriggerResult onProcessingTime(long time, GlobalWindow 
window,
TriggerContext ctx) {
return TriggerResult.CONTINUE;
}

@Override
public void clear(GlobalWindow window, TriggerContext ctx) {
}
 }

 private static class TripEvictor implements Evictor
{
@Override
public void evictBefore(Iterable> 
events,
int size, GlobalWindow window, EvictorContext ctx) {
}

@Override
public void evictAfter(Iterable> 
elements, int
size, GlobalWindow window, EvictorContext
ctx) {
System.out.println(elements);
long firstStop = Event.earliestStopElement(elements);
// remove all events up to (and including) the first 
stop event (which is
the event that triggered the window)
 

Flink web UI authentication

2018-03-13 Thread Sampath Bhat
Hello

I would like to know if flink supports any user level authentication like
username/password for flink web ui.

Regards
Sampath S


Re: What's the best way to clean up the expired rocksdb state

2018-03-13 Thread Chesnay Schepler

Hello,

yes, i think you'll need to use a ProcessFunction and clean up the state 
manually.


On 11.03.2018 15:13, sundy wrote:

hi:

my streaming application always do Key by the some keys with event timestamp, 
such as  keyBy( “qps_1520777430”), so the expired keys(1 hours ago) are useless.

And I use rocksdb to store the state,  I want to know What's the best way to 
clean up the expired rocksdb state, should I must implement a ProcessFunction 
and set a time ticker to do that?





Flink kafka connector with JAAS configurations crashed

2018-03-13 Thread sundy

Hi ,all 

I use the code below to set kafka JASS config,   the serverConfig.jasspath is  
/data/apps/spark/kafka_client_jaas.conf,   but on flink standalone deployment, 
it crashs. I am sure the kafka_client_jass.conf is valid, cause other 
applications(Spark streaming) are still working fine with it. So I think it may 
be not the problem caused by kafka 0.10 client.
System.setProperty("java.security.auth.login.config", serverConfig.jasspath);
properties.setProperty("security.protocol", "SASL_PLAINTEXT");
properties.setProperty("sasl.mechanism", "PLAIN");

Exceptions msgs are:

org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:717)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:597)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:579)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.initializeConnections(Kafka09PartitionDiscoverer.java:56)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:91)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:422)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: 
java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the 
JAAS configuration. System property 'java.security.auth.login.config' is 
/data/apps/spark/kafka_client_jaas.conf
at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:94)
at 
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93)
at 
org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51)
at 
org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:657)
... 11 more
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' 
entry in the JAAS configuration. System property 
'java.security.auth.login.config' is /data/apps/spark/kafka_client_jaas.conf
at 
org.apache.kafka.common.security.JaasUtils.defaultJaasConfig(JaasUtils.java:85)
at 
org.apache.kafka.common.security.JaasUtils.jaasConfig(JaasUtils.java:67)
at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:85)
... 15 more


File content looks like below:

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password=“xxx";
};

It seems like the kafka_client_jaas.conf file has been read, but the 
KafkaClient entry could not be resolved.  That’s very strange, other 
applications with the same config are working fine. And I wrote a simple Java 
code to test the the file, it works fine too. 


public static void main(String[] args) {
  Map maps = new HashMap<>();
  System.setProperty("java.security.auth.login.config", 
"/data/apps/spark/kafka_client_jaas.conf");
  Configuration jassConfig = JaasUtils.jaasConfig(LoginType.CLIENT, maps);
  AppConfigurationEntry object[] = 
jassConfig.getAppConfigurationEntry("KafkaClient");
  for(AppConfigurationEntry entry : object){
System.out.println(entry.getOptions());
  }
}