Re: Flink Statefun and Feature computation

2022-03-10 Thread Federico D'Ambrosio
Hi Igal,

thank you so much for your response.

As for [2], I was mainly interested in how the state is stored physically.
Looking at the deployment files, I see the following file

https://github.com/apache/flink-statefun-playground/blob/main/deployments/k8s/04-statefun/01-statefun-runtime.yaml


where the state seems to be defined by the keys in the flink-conf.yaml:

state.backend: rocksdb
state.backend.rocksdb.timer-service.factory: ROCKSDB

As far as I can tell from the docs, the built-in backends are FS, HashMap
and RocksDB, but I can technically implement my own backend by
implementing this
abstract class
<https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java>
(and a related factory), is that correct?

Thank you again,
Federico

Il giorno gio 24 feb 2022 alle ore 15:11 Igal Shilman  ha
scritto:

> Hello,
>
> For (1) I welcome you to visit our documentions, and many talks online to
> understand more about the motivation and the value of StateFun. I can say
> in a nutshell that StateFun provides few building blocks that makes
> building distributed stateful applications easier.
>
> For (2) checkout our playground repository to see how storage is
> configured. It is completely defined by the SDK and is not configured by
> Flink cluster configuration.
>
> I think that the use case you are describing is a good fit for StateFun.
> If you check out the latest Flink Forward's videos there were few that
> described how to use
> StateFun for exactly that[3].
>
> Good luck!
> Igal
>
> [1] https://nightlies.apache.org/flink/flink-statefun-docs-stable/
> [2] https://github.com/apache/flink-statefun-playground
> [3] https://www.youtube.com/channel/UCY8_lgiZLZErZPF47a2hXMA/videos
>
> On Sun, Feb 20, 2022 at 1:54 PM Federico D'Ambrosio 
> wrote:
>
>> Hello everyone,
>>
>> It's been quite a while since I wrote to the Flink ML, because in my
>> current job never actually arose the need for a stateful stream processing
>> system, until now.
>>
>> Since the last version I actually tried was Flink 1.9, well before
>> Stateful Functions, I had a few questions about some of the latest features.
>>
>> 1. What are the use cases for which Flink Statefuns were thought of? As
>> far as I understand from the documentation, they are basically processors
>> that can be separated from a "main" Flink streaming job (and can be
>> integrated with), but I fail to grasp how they should differ from a rest
>> endpoint implemented using any other framework.
>> 2. How is the storage for these functions configured? I see that the
>> storage for the state is accessed via a Context object, so I think it is
>> configured by a Flink cluster configuration?
>>
>> I would like, then, to elaborate on my use case: we have some 20 CDC
>> topics (1 topic per table) on Kafka. Upon the data streamed on these
>> topics, we need to compute many features to be used by a ML model. Many of
>> these features need to be computed by joining multiple topics and/or need
>> the whole history of the field. So, I was wondering if Stateful Functions
>> could be a good approach to this problem, where a feature could be
>> "packaged" in a single stateful function to be "triggered" by the arrival
>> of any new message on the topic configured as its ingress.
>>
>> So, basically, I'm wondering if they could fit the use case, or we're
>> better off with a custom flink job.
>>
>> Thank you for your time,
>> --
>> Federico D'Ambrosio
>>
>

-- 
Federico D'Ambrosio


Flink Statefun and Feature computation

2022-02-20 Thread Federico D'Ambrosio
Hello everyone,

It's been quite a while since I wrote to the Flink ML, because in my
current job never actually arose the need for a stateful stream processing
system, until now.

Since the last version I actually tried was Flink 1.9, well before Stateful
Functions, I had a few questions about some of the latest features.

1. What are the use cases for which Flink Statefuns were thought of? As far
as I understand from the documentation, they are basically processors that
can be separated from a "main" Flink streaming job (and can be integrated
with), but I fail to grasp how they should differ from a rest endpoint
implemented using any other framework.
2. How is the storage for these functions configured? I see that the
storage for the state is accessed via a Context object, so I think it is
configured by a Flink cluster configuration?

I would like, then, to elaborate on my use case: we have some 20 CDC topics
(1 topic per table) on Kafka. Upon the data streamed on these topics, we
need to compute many features to be used by a ML model. Many of these
features need to be computed by joining multiple topics and/or need the
whole history of the field. So, I was wondering if Stateful Functions could
be a good approach to this problem, where a feature could be "packaged" in
a single stateful function to be "triggered" by the arrival of any new
message on the topic configured as its ingress.

So, basically, I'm wondering if they could fit the use case, or we're
better off with a custom flink job.

Thank you for your time,
-- 
Federico D'Ambrosio


Help with the correct Event Pattern

2019-07-25 Thread Federico D'Ambrosio
Hello everyone,

I need a bit of help concerning a correct formulation for a Complex Event
Pattern, using CEP.

I have a stream of events which once keyed for ids, they may look like this:

a b1 b2 b3 b4 b5 c1 c2 d1 d2 c3 c4 e1 e2 f1

what I want to achieve is to get, from a formulation similar to this:

[1] b c e

this:

b1 c1 e1

that is, for each input stream, have an output composed of only the first
appearance of events of class b, c and e.

I realize that a pattern formulated like [1] would also match:

b1 c2 e1, b1 c2 e2 and so on, so that I would need to refine it.

So, I tried using oneOrMore(), consecutive() and
AfterMatchSkipStrategy.skypToFirst, like this:

val b = Pattern
  .begin[Event]("b")
  .where((value, _) => value.state == "b")
  .oneOrMore().consecutive()

val c = Pattern
  .begin[Event]("c")
  .where((value, _) => value.state == "c")
  .oneOrMore().consecutive()

val e = Pattern
  .begin[Event]("e", AfterMatchSkipStrategy.skipToFirst("b"))
  .where((value, _) => value.state == "e")
  .oneOrMore().consecutive()

val pattern: Pattern[Event, _] =
  b.followedBy(c).followedBy(e)

In the process function I would do something like this:

override def processMatch(matches: util.Map[String, util.List[Event]],
  ctx: PatternProcessFunction.Context,
  out: Collector[OutputEvent]): Unit =  {

val bEvent = matches.get("b").asScala.head
val cEvent = matches.get("c").asScala.head
val eEvent = matches.get("e").asScala.head

out.collect(OutputEvent(bEvent, cEvent, eEvent))
}

But unfortunately it doesn't work like I want, which makes me think I'm
missing something within the functionalities of Flink CEP.

What's the best way to achieve what I want? Is it possible?
Should I even use any AfterMatchSkipStrategy?

Thank you,
Federico D'Ambrosio


Re: Date Handling in Table Source and transformation to DataStream of case class

2019-07-24 Thread Federico D'Ambrosio
Hi Caizhi,

thank you for your response, the full exception is the following:

Exception in thread "main" org.apache.flink.table.api.TableException: Arity
[7] of result [ArrayBuffer(String, String, String, String, String, String,
Timestamp)] does not match the number[1] of requested type
[GenericType].
at
org.apache.flink.table.api.TableEnvironment.generateRowConverterFunction(TableEnvironment.scala:1165)
at
org.apache.flink.table.api.StreamTableEnvironment.getConversionMapper(StreamTableEnvironment.scala:423)
at
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:936)
at
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:866)
at
org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:173)
at
org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:149)
at xautomata.StatePatterns$.main(StatePatterns.scala:119)
at xautomata.StatePatterns.main(StatePatterns.scala)

The code is the following:

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.scala._
import org.apache.flink.table.sources.CsvTableSource

import scala.language.implicitConversions

import java.sql.Timestamp

import org.apache.flink.api.common.typeinfo.TypeInformation

case class cEvent(state: String, id: String, device: String,
  instance: String, subInstance: String, groupLabel:
String, time: Timestamp)

object cEvent {
  implicit val typeInformation: TypeInformation[cEvent] =
TypeInformation.of(classOf[cEvent])
}

object TableToStream {


  def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

val csvTableSource = CsvTableSource.builder()
  .path("instances.csv")
  .fieldDelimiter(",")
  .field("a", Types.STRING)
  .field("b", Types.STRING)
  .field("c", Types.STRING)
  .field("d", Types.STRING)
  .field("e", Types.STRING)
  .field("f", Types.STRING)
  .field("time", Types.SQL_TIMESTAMP)
  .build()

tEnv.registerTableSource("instances", csvTableSource)

val table: Table = tEnv
  .scan("instances")
  .select('a, 'b, 'c, 'd, 'e, 'f, 'time)

val cInstances: DataStream[cEvent] = tEnv.toAppendStream[cEvent](table)
  .assignAscendingTimestamps(_.time.getTime)

cInstances.print()

env.execute("Instances Log Stream")
  }

}


While writing this email, exporting the code, I noticed that what's
actually causing the issue is an import, so that in the code I pasted here,
it works fine, but if I additionally import:

import org.apache.flink.api.scala._

Could you please verify that it happens the same to you? Is the previous
import specifically for the Flink Batch API?
I fear that the Intellij was showing me that import as unused and I thought
nothing of it, while leaving, but I'm assuming it's causing some sort of
collision with

import org.apache.flink.streaming.api.scala._


Thank you,
Federico

Il giorno mer 24 lug 2019 alle ore 12:38 Caizhi Weng 
ha scritto:

> Hi Federico,
>
> I can't reproduce the error in my local environment. Would you mind
> sharing us your code and the full exception stack trace? This will help us
> diagnose the problem. Thanks.
>
> Federico D'Ambrosio  于2019年7月24日周三 下午5:45写道:
>
>> Hi Caizhi,
>>
>> thank you for your response.
>>
>> 1) I see, I'll use a compatible string format
>>
>> 2) I'm defining the case class like this:
>>
>> case class cEvent(state: String, id: String, device: String,
>>   instance: String, subInstance: String, groupLabel: String, 
>> time: Timestamp)
>>
>> object cEvent {
>>   implicit val typeInformation: TypeInformation[cEvent] = 
>> TypeInformation.of(classOf[cEvent])
>> }
>>
>>
>> I'm assuming I'm doing something wrong with the TypeInformation, since
>> the table records are not being converted correctly. The precise error is
>> the following:
>>
>> Arity [7] of result [ArrayBuffer(String, String, String, String, String,
>> String, Timestamp)] does not match the number[1] of requested type
>> [GenericType].
>>
>> I noticed there's a CaseClassTypeInfo which can be created from
>> Types.CASECLASS[cEvent], but I'm not sure how to use it after defining
>> the tabl

Re: Date Handling in Table Source and transformation to DataStream of case class

2019-07-24 Thread Federico D'Ambrosio
Hi Caizhi,

thank you for your response.

1) I see, I'll use a compatible string format

2) I'm defining the case class like this:

case class cEvent(state: String, id: String, device: String,
  instance: String, subInstance: String, groupLabel:
String, time: Timestamp)

object cEvent {
  implicit val typeInformation: TypeInformation[cEvent] =
TypeInformation.of(classOf[cEvent])
}


I'm assuming I'm doing something wrong with the TypeInformation, since the
table records are not being converted correctly. The precise error is the
following:

Arity [7] of result [ArrayBuffer(String, String, String, String, String,
String, Timestamp)] does not match the number[1] of requested type
[GenericType].

I noticed there's a CaseClassTypeInfo which can be created from
Types.CASECLASS[cEvent], but I'm not sure how to use it after defining the
table.

Thank you,
Federico

Il giorno mer 24 lug 2019 alle ore 10:42 Caizhi Weng 
ha scritto:

> Hi Federico,
>
> 1) As far as I know, you can't set a format for timestamp parsing
> currently (see `SqlTimestampParser`, it just feeds your string to
> `SqlTimestamp.valueOf`, so your timestamp format must be compatible with
> SqlTimestamp).
>
> 2) How do you define your case class? You have to define its parameter
> list and nothing in its body to make it work. For example: case class
> Event(a: String, b: String, time: Timestamp)
>
> Federico D'Ambrosio  于2019年7月24日周三 下午4:10写道:
>
>> Hello everyone,
>>
>> I've always used the DataStream API and now I'm trying out the Table API
>> to create a datastream from a CSV and I'm finding a couple of issues:
>>
>> 1) I'm reading a csv with 7 total fields, the 7th of which is a date
>> serialized as a Spark TimestampType, written on the csv like this:
>> 2019-07-19T15:31:38.000+01:00. I've defined the TableSource like this:
>> val csvTableSource = CsvTableSource.builder()
>> .path("sourcefile.csv")
>> .fieldDelimiter(",")
>> /* fields of Types.STRING */
>> .field("time", Types.SQL_TIMESTAMP)
>> .build()
>> I'm transforming the Table to a DataStream of type Event:
>>
>> class Event {
>>   // fields of type String
>>   var time: Timestamp = _
>> }
>>
>> val ds: DataStream[Event] = tEnv.toAppendStream[Event](table)
>> But when I'm reading from the CSV the following parsing error occurs:
>>
>> Caused by: org.apache.flink.api.common.io.ParseException: Parsing error
>> for column 7 of row '..,2019-07-20T09:52:07.000+01:00' originated by
>> SqlTimestampParser: NUMERIC_VALUE_FORMAT_ERROR.
>>
>> So, I'm wondering: is it possible to set a DateFormat or something to
>> make sure the parsing succeeds? I've tried also Types.SQL_DATE and
>> Types.SQL_TIME, but they fail with same exception.
>>
>> 2) My first option was to make Event as a case class, but with the same
>> table definition, I was having trouble with the conversion, with an error
>> telling that the "Arity of 7 fields was not compatible with the destination
>> arity of 1, of type GenericType". What's the correct way to handle
>> case classes? I changed to using a class (which I believe uses the POJO
>> serializer) and it works ok, but I'm still wondering how to make it work
>> with Case Classes, which come quite useful sometimes.
>>
>> Thank you very much,
>> Federico
>> --
>> Federico D'Ambrosio
>>
>

-- 
Federico D'Ambrosio


Date Handling in Table Source and transformation to DataStream of case class

2019-07-24 Thread Federico D'Ambrosio
Hello everyone,

I've always used the DataStream API and now I'm trying out the Table API to
create a datastream from a CSV and I'm finding a couple of issues:

1) I'm reading a csv with 7 total fields, the 7th of which is a date
serialized as a Spark TimestampType, written on the csv like this:
2019-07-19T15:31:38.000+01:00. I've defined the TableSource like this:
val csvTableSource = CsvTableSource.builder()
.path("sourcefile.csv")
.fieldDelimiter(",")
/* fields of Types.STRING */
.field("time", Types.SQL_TIMESTAMP)
.build()
I'm transforming the Table to a DataStream of type Event:

class Event {
  // fields of type String
  var time: Timestamp = _
}

val ds: DataStream[Event] = tEnv.toAppendStream[Event](table)
But when I'm reading from the CSV the following parsing error occurs:

Caused by: org.apache.flink.api.common.io.ParseException: Parsing error for
column 7 of row '..,2019-07-20T09:52:07.000+01:00' originated by
SqlTimestampParser: NUMERIC_VALUE_FORMAT_ERROR.

So, I'm wondering: is it possible to set a DateFormat or something to make
sure the parsing succeeds? I've tried also Types.SQL_DATE and Types.SQL_TIME,
but they fail with same exception.

2) My first option was to make Event as a case class, but with the same
table definition, I was having trouble with the conversion, with an error
telling that the "Arity of 7 fields was not compatible with the destination
arity of 1, of type GenericType". What's the correct way to handle
case classes? I changed to using a class (which I believe uses the POJO
serializer) and it works ok, but I'm still wondering how to make it work
with Case Classes, which come quite useful sometimes.

Thank you very much,
Federico
-- 
Federico D'Ambrosio


Re: Async Source Function in Flink

2018-05-17 Thread Federico D'Ambrosio
I see, thank you very much for your answer! I'll look into pool connection
handling.

Alternatively, I suppose that since it is a SourceFunction, even
synchronous calls may be used without side effects in Flink?

Thank you,
Federico

Il giorno mar 15 mag 2018 alle ore 16:16 Timo Walther 
ha scritto:

> Hi Frederico,
>
> Flink's AsyncFunction is meant for enriching a record with information
> that needs to be queried externally. So I guess you can't use it for your
> use case because an async call is initiated by the input. However, your
> custom SourceFunction could implement a similar asynchronous logic. By
> having a pool of open connections that request asynchronously and emit the
> response to the stream, once available, you can improve your throughput
> (see [0]).
>
> Depending on your use case maybe the SourceFunction can only be
> responsible for determining e.g. ids and the AsyncFunction is requesting
> these ids via REST. This way you could leverage the available async
> capabilities.
>
> I hope this helps.
>
> Regards,
> Timo
>
> [0]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/asyncio.html#the-need-for-asynchronous-io-operations
>
>
> Am 14.05.18 um 14:51 schrieb Federico D'Ambrosio:
>
> Hello everyone,
>
> just wanted to ask a quick question: I have to retrieve data from 2 web
> services via REST calls, use them as sources and push these data to Kafka.
> So far, I implemented a SourceFunction which deals with making the calls
> with the respective clients.
>
> Now, the function does use, for each REST call, Await.result(). Do I
> need to use Flink's AsyncFunction instead? What are the best practices when
> it comes to AsyncSources?
>
> Thank you,
> --
> Federico D'Ambrosio
>
>
>

-- 
Federico D'Ambrosio


Async Source Function in Flink

2018-05-14 Thread Federico D'Ambrosio
 Hello everyone,

just wanted to ask a quick question: I have to retrieve data from 2 web
services via REST calls, use them as sources and push these data to Kafka.
So far, I implemented a SourceFunction which deals with making the calls
with the respective clients.

Now, the function does use, for each REST call, Await.result(). Do I
need to use Flink's AsyncFunction instead? What are the best practices when
it comes to AsyncSources?

Thank you,
-- 
Federico D'Ambrosio


Re: Timestamp from Kafka record and watermark generation

2018-02-23 Thread Federico D'Ambrosio
Thank you very much Aljoscha!

2018-02-23 14:45 GMT+01:00 Aljoscha Krettek :

> Hi,
>
> This is a know issue: https://issues.apache.org/jira/browse/FLINK-8500.
> And yes, the workaround is to write an assigner from scratch but you can
> start by copying the code of AscendingTimestampExtractor.
>
> Sorry for the inconvenience.
>
> --
> Aljoscha
>
> On 22. Feb 2018, at 12:05, Federico D'Ambrosio  wrote:
>
> Hello everyone,
>
> I'm consuming from a Kafka topic, on which I'm writing with a
> FlinkKafkaProducer, with the timestamp relative flag set to true.
>
> From what I gather from the documentation [1], Flink is aware of Kafka
> Record's timestamp and only the watermark should be set with an appropriate
> TimestampExtractor, still I'm failing to understand how to implement it in
> the right way.
>
> I thought that it would be possible to use the already existent
> AscendingTimestampExtractor, overriding the extractTimestamp method, but
> it's marked final.
>
> new FlinkKafkaConsumer010[Event](ingestion_topic, new 
> JSONDeserializationSchema(), consumerConfig)
>   .setStartFromLatest()
> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Event]() {
>   def extractAscendingTimestamp(element: Event): Long = ???
> })
>
> Should I need to implement my own TimestampExtractor (with the appropriate
> getCurrentWatermark and extractTimestamp methods) ?
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/connectors/kafka.html#using-kafka-
> timestamps-and-flink-event-time-in-kafka-010
>
> Thank you,
> Federico
>
>
>


-- 
Federico D'Ambrosio


Timestamp from Kafka record and watermark generation

2018-02-22 Thread Federico D'Ambrosio
Hello everyone,

I'm consuming from a Kafka topic, on which I'm writing with a
FlinkKafkaProducer, with the timestamp relative flag set to true.

>From what I gather from the documentation [1], Flink is aware of Kafka
Record's timestamp and only the watermark should be set with an appropriate
TimestampExtractor, still I'm failing to understand how to implement it in
the right way.

I thought that it would be possible to use the already existent
AscendingTimestampExtractor, overriding the extractTimestamp method, but
it's marked final.

new FlinkKafkaConsumer010[Event](ingestion_topic, new
JSONDeserializationSchema(), consumerConfig)
  .setStartFromLatest()
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Event]() {
  def extractAscendingTimestamp(element: Event): Long = ???
})

Should I need to implement my own TimestampExtractor (with the appropriate
getCurrentWatermark and extractTimestamp methods) ?

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010


Thank you,
Federico


Re: How to deal with java.lang.IllegalStateException: Could not initialize keyed state backend after a code update

2017-11-28 Thread Federico D'Ambrosio
Hi Gordon,

explicitly specifying the serialversionuid did the job, thank you! The
failing task was latest_time -> (cassandra-map -> Sink:
cassandra-active-sink, map_active_stream, map_history_stream) like the
following:

val events = keyedstream
  .window(Time.seconds(20))
  .maxBy("field").name("latest-time")

CassandraSink.addSink(
   events.map(_.toCassandraTuple).name("cassandra-map").javaStream)
.setQuery(...)
.setClusterBuilder(...)
.build().name("cassandra-sink")

with cassandra-map, map_history_stream and map_active_stream, stateless map
functions
So, I guess the culprit was either the window/maxBy operator or the
cassandra sink. I guess the window/maxBy operator, since the initialization
of a keyed state is specified.
I'm attaching the complete log.

Cheers,
Federico


2017-11-28 15:32 GMT+01:00 Tzu-Li (Gordon) Tai :

> Hi Federico,
>
> It seems like the state cannot be restored because the class of the state
> type (i.e., Event) had been modified since the savepoint, and therefore has
> a conflicting serialVersionUID with whatever it is in the savepoint.
> This can happen if Java serialization is used for some part of your state,
> and the class of the written data was modified while a fixed
> serialVersionUID was not explicitly specified for that class.
>
> To avoid this, you should explicitly set a serialVersionUID for the Event
> class.
> You can actually also do that now without losing state while also
> incorporating the modifications you were trying to do for your updated job.
> Explicitly declare the serialVersionUID of the Event class to what is was
> before your modifications (i.e., 8728793377941765980, according to your
> error log).
>
> One side question: are you experiencing this restore failure for one of
> your custom operator states, or is this failing state part of some Flink
> built-in operator / connector?
> I’m asking just to have an idea of which Flink built-in operator /
> connectors still use Java serialization for user state; ideally we would
> want that to be completed removed in the future.
>
> Cheers,
> Gordon
>
>
> On 28 November 2017 at 10:02:19 PM, Federico D'Ambrosio (
> federico.dambro...@smartlab.ws) wrote:
>
> Hi,
>
> I recently had to do a code update of a long running Flink Stream job
> (1.3.2) and on the restart from the savepoint I had to deal with:
>
> java.lang.IllegalStateException: Could not initialize keyed state backend.
>
> Caused by: java.io.InvalidClassException: lab.vardata.events.Event; local
> class incompatible: stream classdesc serial
> VersionUID = 8728793377341765980, local class serialVersionUID =
> -4253404384162522764
>
> because I have changed a method used to convert the Event to a Cassandra
> writable Tuple (in particular, I changed the return type from Tuple10 to
> Tuple11, after adding a field). I reverted those changes back since it
> wasn't much of a problem per se.
>
> Now, I understand the root cause of this issue and I wanted to ask if
> there are any "best practices" to prevent this kind of issues, without
> losing the state of the job, because of restarting it from the very
> beginning.
>
> --
> Federico D'Ambrosio
>
>


-- 
Federico D'Ambrosio
11/28/2017 14:53:28 latest_time -> (cassandra-map -> Sink: 
cassandra-active-sink, map_active_stream, map_history_stream)(1/1) switched to 
FAILED
java.lang.IllegalStateException: Could not initialize keyed state backend.
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.InvalidClassException: 
lab.vardata.events.AirTrafficEventWithId; local class incompatible: stream 
classdesc serialVersionUID = 8728793377341765980, local class serialVersionUID 
= -4253404384162522764
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1484)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1334)
at java.io.ObjectInputStream.readObj

How to deal with java.lang.IllegalStateException: Could not initialize keyed state backend after a code update

2017-11-28 Thread Federico D'Ambrosio
Hi,

I recently had to do a code update of a long running Flink Stream job
(1.3.2) and on the restart from the savepoint I had to deal with:

java.lang.IllegalStateException: Could not initialize keyed state backend.

Caused by: java.io.InvalidClassException: lab.vardata.events.Event; local
class incompatible: stream classdesc
serial   VersionUID =
8728793377341765980, local class serialVersionUID = -4253404384162522764

because I have changed a method used to convert the Event to a Cassandra
writable Tuple (in particular, I changed the return type from Tuple10 to
Tuple11, after adding a field). I reverted those changes back since it
wasn't much of a problem per se.

Now, I understand the root cause of this issue and I wanted to ask if there
are any "best practices" to prevent this kind of issues, without losing the
state of the job, because of restarting it from the very beginning.

-- 
Federico D'Ambrosio


Re: Flink session on yarn

2017-11-20 Thread Federico D'Ambrosio
Hi Nishu,

did you compile Flink from sources as recommended here [1]?

Regards,
Federico

[1] https://ci.apache.org/projects/flink/flink-docs-
release-1.3/setup/building.html#vendor-specific-versions

2017-11-20 13:53 GMT+01:00 Nishu :

> Hi,
>
> I am trying to start flink session(v1.3.2) on yarn(v 2.7) on HDInsight
> cluster.  But it throws following error:
>
> *Error while deploying YARN cluster: Couldn't deploy Yarn cluster*
> *java.lang.RuntimeException: Couldn't deploy Yarn cluster*
> *at
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploy(AbstractYarnClusterDescriptor.java:443)*
> *at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:630)*
> *at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:486)*
> *at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:483)*
> *at
> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)*
> *at java.security.AccessController.doPrivileged(Native Method)*
> *at javax.security.auth.Subject.doAs(Subject.java:422)*
> *at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)*
> *at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)*
> *at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:483)*
> *Caused by: java.lang.RuntimeException: java.lang.RuntimeException:
> java.lang.ClassNotFoundException: Class
> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider not
> found*
> *at
> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)*
> *at
> org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:161)*
> *at
> org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:94)*
> *at
> org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:72)*
>
>
> I referred this mail thread also: http://apache-flink-
> user-mailing-list-archive.2336050.n4.nabble.com/Flink-session-on-Yarn-
> ClassNotFoundException-td15222.html
>
> I am using HDInsight Cluster(HDP3.6).   According to the documentation,
> set HADOOP_CONF_DIR and YARN_CONF_DIR as well.
>
> Any inputs will be really helpful. Thanks!
>
> --
> Thanks & Regards,
> Nishu Tayal
>



-- 
Federico D'Ambrosio


Re: FlinkCEP behaviour with time constraints not as expected

2017-11-08 Thread Federico D'Ambrosio
Thank you very much, that was really helpful

Cheers,
Federico

2017-11-08 13:51 GMT+01:00 Dawid Wysakowicz :

> Unforunately there is mistake in the docs the return type should be
> DataStream rather than SingleOuputStream
>
> The correct version should be:
>
> val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
>
> val outputTag = OutputTag[String]("side-output")
>
> val result: DataStream[ComplexEvent] = patternStream.select(outputTag){
> (pattern: Map[String, Iterable[Event]], timestamp: Long) =>
> TimeoutEvent()
> } {
> pattern: Map[String, Iterable[Event]] => ComplexEvent()
> }
>
> This syntax is only available in 1.4 though, in previous versions
> timeouted events were not returned via sideOutput.
>
>
>
> > On 8 Nov 2017, at 12:18, Federico D'Ambrosio <
> federico.dambro...@smartlab.ws> wrote:
> >
> > Thank you very much, Dawid, for your thorough explanation, really
> useful. I totally missed the distinction between timed-out events and
> complete matches.
> >
> > I'd like to ask you one more thing, about the flinkCEP scala api: in the
> documentation, there is the following code:
> >
> > val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
> >
> >
> >
> > val outputTag = OutputTag[String]("side-output")
> >
> >
> >
> > val result: SingleOutputStreamOperator[ComplexEvent] =
> patternStream.select(outputTag){
> >
> >
> > (pattern: Map[String, Iterable[Event]], timestamp: Long) =>
> TimeoutEvent()
> > } {
> >
> >
> > pattern: Map[String, Iterable[Event]] => ComplexEvent()
> > }
> >
> > where result would then be used to get outputtag side output.
> > If I paste this code I get that the select function is missing its
> parameters ("Unspecified value parameters: patternSelectFunction:
> PatternSelectFunction[ComplexEvent, NotInferredR]""),
> > while, If I add the parameters explicitly such as
> >
> > patternStream.select[TimeoutEvent, ComplexEvent]
> >
> > I get "Too many arguments for select". Am I missing something?
> >
> > Thank you very much,
> > Federico
> >
> > 2017-11-07 16:34 GMT+01:00 Dawid Wysakowicz  >:
> > Hi Federico,
> >
> > For your given input and pattern there should (and there are) only two
> timeouted patterns:
> >
> > 5> Left(Map(start -> List(Event(100,2017-11-05T03:56:02
> > 5> Left(Map(start -> List(Event(100,2017-11-05T06:00:02
> >
> > It is because in your patterns say the next event after events with
> value >=100 should not have value >= 100 . And within your timeout there is
> no sequence of events where (>=100)+ (<100).
> >
> > But I will try to explain how it works with the same input for Pattern:
> >
> > Pattern[Event].begin("start").where(_.value >=100).oneOrMore
> > .notNext("end").where(_.value <100).within(Time.minutes(30))
> >
> > Then we have matches:
> >
> > 5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02
> > 5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02),
> Event(100,2017-11-05T03:52:02
> > 5> Right(Map(start -> List(Event(100,2017-11-05T03:52:02
> > 5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02),
> Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02
> > 5> Right(Map(start -> List(Event(100,2017-11-05T03:52:02),
> Event(100,2017-11-05T03:54:02
> > 5> Right(Map(start -> List(Event(100,2017-11-05T03:54:02
> >
> > and timeouted partial matches:
> >
> > 5> Left(Map(start -> List(Event(100,2017-11-05T03:50:02),
> Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02),
> Event(100,2017-11-05T03:56:02
> > 5> Left(Map(start -> List(Event(100,2017-11-05T03:52:02),
> Event(100,2017-11-05T03:54:02), Event(100,2017-11-05T03:56:02
> > 5> Left(Map(start -> List(Event(100,2017-11-05T03:54:02),
> Event(100,2017-11-05T03:56:02
> > 5> Left(Map(start -> List(Event(100,2017-11-05T03:56:02
> > 5> Left(Map(start -> List(Event(100,2017-11-05T06:00:02
> >
> > Right now (in flink 1.3.2) pattern can start on each event (in 1.4 you
> will be able to specify AFTER_MATCH_SKIP strategy see:
> https://issues.apache.org/jira/browse/FLINK-7169), therefore you see
> matches starting at 2017-11-05T03:50:02, 2017-11-05T03:52:02,
> 2017-11-05T03:54:02.
> > Also right now the oneOrMore is not greedy (in 1.4 you will be able to
&

Re: FlinkCEP behaviour with time constraints not as expected

2017-11-08 Thread Federico D'Ambrosio
Thank you very much, Dawid, for your thorough explanation, really useful. I
totally missed the distinction between timed-out events and complete
matches.

I'd like to ask you one more thing, about the flinkCEP scala api: in the
documentation, there is the following code:

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
val outputTag = OutputTag[String]("side-output")
val result: SingleOutputStreamOperator[ComplexEvent] =
patternStream.select(outputTag){
(pattern: Map[String, Iterable[Event]], timestamp: Long) =>
TimeoutEvent()} {
pattern: Map[String, Iterable[Event]] => ComplexEvent()}

where result would then be used to get outputtag side output.

If I paste this code I get that the select function is missing its
parameters ("Unspecified value parameters: patternSelectFunction:
PatternSelectFunction[ComplexEvent, NotInferredR]""),
while, If I add the parameters explicitly such as

patternStream.select[TimeoutEvent, ComplexEvent]

I get "Too many arguments for select". Am I missing something?

Thank you very much,

Federico


2017-11-07 16:34 GMT+01:00 Dawid Wysakowicz :

> Hi Federico,
>
> For your given input and pattern there should (and there are) only two
> timeouted patterns:
>
> 5> Left(Map(start -> List(Event(100,2017-11-05T03:56:02
> 5> Left(Map(start -> List(Event(100,2017-11-05T06:00:02
>
> It is because in your patterns say the next event after events with value
> >=100 should not have value >= 100 . And within your timeout there is no
> sequence of events where (>=100)+ (<100).
>
> But I will try to explain how it works with the same input for Pattern:
>
> Pattern[Event].begin("start").where(_.value >=100).oneOrMore
> .notNext("end").where(_.value <100).within(Time.minutes(30))
>
> Then we have matches:
>
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02),
> Event(100,2017-11-05T03:52:02
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:52:02
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02),
> Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:52:02),
> Event(100,2017-11-05T03:54:02
> 5> Right(Map(start -> List(Event(100,2017-11-05T03:54:02
>
> and timeouted partial matches:
>
> 5> Left(Map(start -> List(Event(100,2017-11-05T03:50:02),
> Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02),
> Event(100,2017-11-05T03:56:02
> 5> Left(Map(start -> List(Event(100,2017-11-05T03:52:02),
> Event(100,2017-11-05T03:54:02), Event(100,2017-11-05T03:56:02
> 5> Left(Map(start -> List(Event(100,2017-11-05T03:54:02),
> Event(100,2017-11-05T03:56:02
> 5> Left(Map(start -> List(Event(100,2017-11-05T03:56:02
> 5> Left(Map(start -> List(Event(100,2017-11-05T06:00:02
>
> Right now (in flink 1.3.2) pattern can start on each event (in 1.4 you
> will be able to specify AFTER_MATCH_SKIP strategy see:
> https://issues.apache.org/jira/browse/FLINK-7169), therefore you see
> matches starting at 2017-11-05T03:50:02, 2017-11-05T03:52:02,
> 2017-11-05T03:54:02.
> Also right now the oneOrMore is not greedy (in 1.4 you will be able to
> alter it see: https://issues.apache.org/jira/browse/FLINK-7147),
> therefore you see matches like: List(Event(100,2017-11-05T03:50:02)) and
> List(Event(100,2017-11-05T03:50:02), Event(100,2017-11-05T03:52:02))
> rather than only one of those.
>
> The timeoute partial matches are returned because within the timeout there
> was no event with value <100 (in fact there was no event at all to be
> checked).
>
> Hope this "study" helps you understand the behaviour. If you feel I missed
> something, please provide some example I could reproduce.
>
> Regards,
> Dawid
>
> 2017-11-07 11:29 GMT+01:00 Ufuk Celebi :
>
>> Hey Frederico,
>>
>> let me pull in Dawid (cc'd) who works on CEP. He can probably clarify
>> the expected behaviour here.
>>
>> Best,
>>
>> Ufuk
>>
>>
>> On Mon, Nov 6, 2017 at 12:06 PM, Federico D'Ambrosio
>>  wrote:
>> > Hi everyone,
>> >
>> > I wanted to ask if FlinkCEP in the following scenario is working as it
>> > should, or I have misunderstood its functioning.
>> >
>> > I've got a keyedstream associated with the following pattern:
>> >
>> > Pattern[Event].begin("start").where(_.value >=100).oneOrMore
>> > .notNext("end").where(_.value >=100).within(Time.minutes(30))
>> >
>>

FlinkCEP behaviour with time constraints not as expected

2017-11-06 Thread Federico D'Ambrosio
Hi everyone,

I wanted to ask if FlinkCEP in the following scenario is working as it
should, or I have misunderstood its functioning.

I've got a keyedstream associated with the following pattern:

Pattern[Event].begin("start").where(_.value >=100).oneOrMore
.notNext("end").where(_.value >=100).within(Time.minutes(30))

Considering a single key in the stream, for simplicity, I've got the
following sequence of events (using EventTime on the "time" field of the
json event):

{value: 100, time: "2017-11-05 03:50:02.000"}
{value: 100, time: "2017-11-05 03:52:02.000"}
{value: 100, time: "2017-11-05 03:54:02.000"}
{value: 100, time: "2017-11-05 03:56:02.000"} // end of events within the
30 minutes from the first event
{value: 100, time: "2017-11-05 06:00:02.000"}

Now, when it comes to the select/flatselect function, I tried printing the
content of the pattern map and what I noticed is that, for example, the
first 2 events weren't considered in the same pattern as the map was like
the following:

{start=[{value: 100, time: 2017-11-05 03:50:02.000}]}
{start=[{value: 100, time: 2017-11-05 03:52:02.000}]}

Now, shouldn't they be in the same List, as they belong to the same
iterative pattern, defined with the oneOrMore clause?

Thank you for your insight,
Federico D'Ambrosio


Re: FlinkCEP, circular references and checkpointing failures

2017-11-03 Thread Federico D'Ambrosio
Thank you very much for your steady response, Kostas!

Cheers,
Federico

2017-11-03 16:26 GMT+01:00 Kostas Kloudas :

> Hi Federico,
>
> Thanks for trying it out!
> Great to hear that your problem was fixed!
>
> The feature freeze for the release is going to be next week, and I would
> expect 1 or 2 more weeks testing.
> So I would say in 2.5 weeks. But this is of course subject to potential
> issues we may find during testing.
>
> Cheers,
> Kostas
>
> On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <
> federico.dambro...@smartlab.ws> wrote:
>
> Hi Kostas,
>
> I just tried running the same job with 1.4-SNAPSHOT for 10 minutes and it
> didn't crash, so that was the same underlying issue of the JIRA you linked.
>
> Do you happen to know when it's expected the 1.4 stable release?
>
> Thank you very much,
> Federico
>
> 2017-11-03 15:25 GMT+01:00 Kostas Kloudas :
>
>> Perfect! thanks a lot!
>>
>> Kostas
>>
>> On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <
>> federico.dambro...@smartlab.ws> wrote:
>>
>> Hi Kostas,
>>
>> yes, I'm using 1.3.2. I'll try the current master and I'll get back to
>> you.
>>
>> 2017-11-03 15:21 GMT+01:00 Kostas Kloudas :
>>
>>> Hi Federico,
>>>
>>> I assume that you are using Flink 1.3, right?
>>>
>>> In this case, in 1.4 we have fixed a bug that seems similar to your case:
>>> https://issues.apache.org/jira/browse/FLINK-7756
>>>
>>> Could you try the current master to see if it fixes your problem?
>>>
>>> Thanks,
>>> Kostas
>>>
>>> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <
>>> federico.dambro...@smartlab.ws> wrote:
>>>
>>>  Could not find id for entry:
>>>
>>>
>>>
>>>
>>
>>
>> --
>> Federico D'Ambrosio
>>
>>
>>
>
>
> --
> Federico D'Ambrosio
>
>
>


-- 
Federico D'Ambrosio


Re: FlinkCEP, circular references and checkpointing failures

2017-11-03 Thread Federico D'Ambrosio
Hi Kostas,

I just tried running the same job with 1.4-SNAPSHOT for 10 minutes and it
didn't crash, so that was the same underlying issue of the JIRA you linked.

Do you happen to know when it's expected the 1.4 stable release?

Thank you very much,
Federico

2017-11-03 15:25 GMT+01:00 Kostas Kloudas :

> Perfect! thanks a lot!
>
> Kostas
>
> On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <
> federico.dambro...@smartlab.ws> wrote:
>
> Hi Kostas,
>
> yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.
>
> 2017-11-03 15:21 GMT+01:00 Kostas Kloudas :
>
>> Hi Federico,
>>
>> I assume that you are using Flink 1.3, right?
>>
>> In this case, in 1.4 we have fixed a bug that seems similar to your case:
>> https://issues.apache.org/jira/browse/FLINK-7756
>>
>> Could you try the current master to see if it fixes your problem?
>>
>> Thanks,
>> Kostas
>>
>> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <
>> federico.dambro...@smartlab.ws> wrote:
>>
>>  Could not find id for entry:
>>
>>
>>
>>
>
>
> --
> Federico D'Ambrosio
>
>
>


-- 
Federico D'Ambrosio


Re: FlinkCEP, circular references and checkpointing failures

2017-11-03 Thread Federico D'Ambrosio
Hi Kostas,

yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.

2017-11-03 15:21 GMT+01:00 Kostas Kloudas :

> Hi Federico,
>
> I assume that you are using Flink 1.3, right?
>
> In this case, in 1.4 we have fixed a bug that seems similar to your case:
> https://issues.apache.org/jira/browse/FLINK-7756
>
> Could you try the current master to see if it fixes your problem?
>
> Thanks,
> Kostas
>
> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <
> federico.dambro...@smartlab.ws> wrote:
>
>  Could not find id for entry:
>
>
>
>


-- 
Federico D'Ambrosio


Re: FlinkCEP, circular references and checkpointing failures

2017-11-03 Thread Federico D'Ambrosio
tasks.StreamTask.initializeOperators(StreamTask.java:676)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.StreamCorruptedException: invalid type code: 00
at
java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2519)
at
java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2553)
at
java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2455)
at
java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951)
at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1621)
at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211)
at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169)
at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852)
at
org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:518)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
... 6 more

2017-11-03 15:12 GMT+01:00 Federico D'Ambrosio <
federico.dambro...@smartlab.ws>:

> Hello everyone,
>
> I'm a bit experimenting with FlinkCEP and I'm noticing weird failures when
> it comes to checkpoints and within clauses windows closing at the same time
> a (synchronous, both on Fs and RocksDB, stored in hdfs) checkpoint occurs.
>
> The following is the relevant code:
>
> val env : StreamExecutionEnvironment = StreamExecutionEnvironment.
> getExecutionEnvironment
> env.enableCheckpointing(6) //Checkpoints every minute
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> env.setStateBackend(new FsStateBackend("hdfs:///flink/checkpoints-dir"))
>
> //Pattern
> val pattern =
>   Pattern
> 
> .begin[EventWithId]("flying").oneOrMore.where(_.event.instantValues.altitude
> >= 37000)
> .notNext("disappearing").where(_.event.instantValues.altitude >=
> 37000).within(Time.minutes(1))
>
> // Associate KeyedStream with pattern to be detected
> val patternStream  = CEP.pattern(streamById, pattern)
>
> which causes failure on the second checkpoint with the following exception
> stack trace:
>
> AsynchronousException{java.lang.Exception: Could not materialize
> checkpoint 2 fo   r
> operator KeyedCEPPatternOperator -> alert-select -> Sink:
> notification-sink-1
> (1/1).}
> at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRu
> nnable.run(StreamTask.java:970)
> at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:51
> 1)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.
> java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor
> .java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for
> operator
> KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1 (1/1).
> ... 6 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.IllegalStateExcept
> ion: Could not find id for entry: SharedBufferEntry(
> ValueTimeWrapper(

FlinkCEP, circular references and checkpointing failures

2017-11-03 Thread Federico D'Ambrosio
readSerialData(ObjectInputStream.java:1924)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1
801)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.j
ava:1211)
at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java
:1169)
at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852)
at
org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$State
TableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.jav
a:132)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePart
itionedState(HeapKeyedStateBackend.java:518)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(Hea
pKeyedStateBackend.java:397)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateB
ackend(StreamTask.java:772)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initK
eyedState(AbstractStreamOperator.java:311)
... 6 more

11/03/2017 13:46:57 Job execution switched to status FAILING.
java.lang.IllegalStateException: Could not initialize keyed state backend.
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initK
eyedState(AbstractStreamOperator.java:321)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initi
alizeState(AbstractStreamOperator.java:217)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperato
rs(StreamTask.java:676)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(S
treamTask.java:663)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:252)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.StreamCorruptedException: invalid type code: 00
at
java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(Object
InputStream.java:2519)
at
java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStre
am.java:2553)
at
java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectIn
putStream.java:2455)
at
java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951)
at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:162
1)
at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1
774)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:20
00)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1
801)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.j
ava:1211)
at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java
:1169)
at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852)
at
org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$State
TableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.jav
a:132)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePart
itionedState(HeapKeyedStateBackend.java:518)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(Hea
pKeyedStateBackend.java:397)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateB
ackend(StreamTask.java:772)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initK
eyedState(AbstractStreamOperator.java:311)
... 6 more


What is happening here? Am I doing something wrong? Is there some sort of
conflict between within clauses deadlines and checkpoint deadlines?

I found the following similar JIRA pages, but none of those mention
circular references: https://issues.apache.org/jira/browse/FLINK-6321
https://issues.apache.org/jira/browse/FLINK-7484
https://issues.apache.org/jira/browse/FLINK-7756

Kind Regards,
Federico D'Ambrosio


Could not initialize keyed state backend on restart from checkpoint

2017-10-24 Thread Federico D'Ambrosio
Hello everyone,

while trying to restart a flink job from an externalized checkpoint I'm
getting the following exception:

java.lang.IllegalStateException: Could not initialize keyed state backend.
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unable to restore keyed state
[window-contents]. For memory-backed keyed state, the previous serializer
of the keyed state must be present; the serializer could have been removed
from the classpath, or its implementation have changed and could not be
loaded. This is a temporary restriction that will be fixed in future
versions.
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:465)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
... 6 more

The failing job, causing this exception, is
"latest-time"->"map_active_stream" and uses JodaDateTimeSerializer,
behaving like it follows:

// Preprocessing with Aggregation to get only the most recent event
val airtrafficEvents = streamByID
.window(TumblingEventTimeWindows.of(Time.seconds(20)))
.maxBy("airTrafficEvent").name("latest_time").uid("latest_time")

// Sinks
val activeStream = airtrafficEvents
.map(event =>
event.toMongoActiveJsValue).name("map_active_stream").uid("map_active_stream")
.timeWindowAll(Time.seconds(10))
.apply(new
MongoWindow(MongoWritingType.UPDATE)).name("active_stream_window").uid("active_stream_window")

This exception occurred after restarting the job from an externalized
checkpoint, after rebuilding the uber-jar because of the removal of a sink
which wasn't needed anymore, using thus --allowNonRestoredState while
restarting. I'd like to stress that the serializer has always been in the
classpath, inside the uber-jar and no change of implementation was made in
between executions.

I reproduced this behaviour by commenting in and out this sink, rebuilding
and restarting the job both from a savepoint and an externalized checkpoint.

Do you have any insight on this?

Cheers,
Federico D'Ambrosio


FlinkCEP: pattern application on a KeyedStream

2017-10-19 Thread Federico D'Ambrosio
Hi all,

I was wondering if it is correct to assume the application of a pattern on
a KeyedStream similar to the application, e.g., of a MapFunction when it
comes to state.

For example, the following

val pattern = ...
val keyedStream = stream.keyBy("id")

val patternKeyedStream = CEP.pattern(pattern, keyedStream)

val anotherKeyedStream = patternKeyedStream.select(...)

should only check the pattern on each single partition value.

Am I correct in assuming this, or I have misunderstood CEP functioning?

-- 
Federico D'Ambrosio


Re: ArrayIndexOutOfBoundExceptions while processing valve output watermark and while applying ReduceFunction in reducing state

2017-10-12 Thread Federico D'Ambrosio
Hi Aljoscha,

yes, just like you're guessing, without asynchronous checkpoints, there has
been no crash so far.

Regards,
Federico

2017-10-12 18:08 GMT+02:00 Aljoscha Krettek :

> Hi Federico,
>
> I'm guessing the job is still working without asynchronous watermarks? I'm
> very eager to figure out what is actually going wrong with asynchronous
> checkpoints.
>
> Best,
> Aljoscha
>
>
> On 2. Oct 2017, at 11:57, Federico D'Ambrosio <
> federico.dambro...@smartlab.ws> wrote:
>
> As a followup:
>
> the flink job has currently an uptime of almost 24 hours, with no
> checkpoint failed or restart whereas, with async snapshots, it would have
> already crashed 50 or so times.
>
> Regards,
> Federico
>
> 2017-09-30 19:01 GMT+02:00 Federico D'Ambrosio <
> federico.dambro...@smartlab.ws>:
>
>> Thank you very much, Gordon.
>>
>> I'll try to run the job without the asynchronous snapshots first thing.
>>
>> As for the Event data type: it's a case class with 2 fields: a String ID
>> and a composite case class (let's call it RealEvent) containing 3 fields of
>> the following types: Information, which is a case class with String fields,
>> Coordinates, a nested case class with 2 Double and InstantValues, with 3
>> Integers and a DateTime.This DateTime field in InstantValues is the one
>> being evalued in the maxBy (via InstantValues and RealEvent compareTo
>> implementations, because dot notation is not working in scala as of 1.3.2,
>> FLINK-7629 <https://issues.apache.org/jira/browse/FLINK-7629>) and that
>> was the reason in the first place I had to register the
>> JodaDateTimeSerializer with Kryo.
>>
>> Regards,
>> Federico
>>
>>
>>
>>
>> 2017-09-30 18:08 GMT+02:00 Tzu-Li (Gordon) Tai :
>>
>>> Hi,
>>>
>>> Thanks for the extra info, it was helpful (I’m not sure why your first
>>> logs didn’t have the full trace, though).
>>>
>>> I spent some time digging through the error trace, and currently have
>>> some observations I would like to go through first:
>>>
>>> 1. So it seems like the ArrayIndexOutOfBoundsException was thrown while
>>> trying to access the state and making a copy (via serialization) in the
>>> CopyOnWriteStateTable.
>>> 2. The state that caused the exception seems to be the state of the
>>> reducing window function (i.e. the maxBy). The state type should be the
>>> same as the records in your `events` DataStream, which seems to be a Scala
>>> case class with some nested field that requires Kryo for serialization.
>>> 3. Somehow Kryo failed with the ArrayIndexOutOfBoundsException when
>>> trying to copy that field ..
>>>
>>> My current guess would perhaps be that the serializer internally used
>>> may have been incorrectly shared, which is probably why this exception
>>> happens randomly for you.
>>> I recall that there were similar issues that occurred before due to the
>>> fact that some KryoSerializers aren't thread-safe and was incorrectly
>>> shared in Flink.
>>>
>>> I may need some help from you to be able to look at this a bit more:
>>> - Is it possible that you disable asynchronous snapshots and try running
>>> this job a bit more to see if the problem still occurs? This is mainly to
>>> eliminate my guess on whether or not there is some incorrect serializer
>>> usage in the CopyOnWriteStateTable.
>>> - Could you let us know what your `events` DataStream records type case
>>> class looks like?
>>>
>>> Also looping in Aljoscha and Stefan here, as they would probably have
>>> more insights in this.
>>>
>>> Cheers,
>>> Gordon
>>>
>>> On 30 September 2017 at 10:56:33 AM, Federico D'Ambrosio (
>>> federico.dambro...@smartlab.ws) wrote:
>>>
>>> Hi Gordon,
>>>
>>> I remembered that I had already seen this kind of exception once during
>>> the testing of the current job and fortunately I had the complete
>>> stacktrace still saved on my pc:
>>>
>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>> at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:15
>>> 7)
>>> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
>>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali
>>> zer.copy(KryoSerializer.java:176)
>&

Re: How to deal with blocking calls inside a Sink?

2017-10-02 Thread Federico D'Ambrosio
Hi Timo,

thank you for your response. Just yesterday I tried using the jdbc
connector and unfortunately I found out that HivePreparedStatement and
HiveStatement implementations still don't have an addBatch implementation,
whose interface is being used in the connector. The first dirty solution
that came to my mind was to slightly modify the current JDBCOutputFormat in
order to make a concatenation of insert queries, thus strings, to pass on
to the execute() method of the Statement.

I guess that using AsyncIO would be really the best approach, as you're
saying.

Regards,
Federico

2017-10-02 12:17 GMT+02:00 Timo Walther :

> Hi Federico,
>
> would it help to buffer events first and perform batches of insertions for
> better throughtput? I saw some similar work recently here:
> https://tech.signavio.com/2017/postgres-flink-sink
>
> But I would first try the AsyncIO approach, because actually this is a use
> case it was made for.
>
> Regards,
> Timo
>
>
> Am 10/2/17 um 11:53 AM schrieb Federico D'Ambrosio:
>
> Hi, I've implemented a sink for Hive as a RichSinkFunction, but once I've
> integrated it in my current flink job, I noticed that the processing of the
> events slowed down really bad, I guess because of some blocking calls that
> need to be when interacting with hive streaming api.
>
> So, what can be done to make it so the throughput doesn't get hurt by
> these calls? I guess increasing (by a lot) the parallelism of the sink
> operator could be a solution, but I'd think it's not really a good one.
>
> Maybe using the AsyncFunction API? Decoupling the sink in a buffer which
> sends the data + operations to be made in the asyncInvoke method of the
> AsyncFunction?
>
> Any suggestion is appreciated.
> Kind regards,
> Federico D'Ambrosio
>
>
>


-- 
Federico D'Ambrosio


Re: ArrayIndexOutOfBoundExceptions while processing valve output watermark and while applying ReduceFunction in reducing state

2017-10-02 Thread Federico D'Ambrosio
As a followup:

the flink job has currently an uptime of almost 24 hours, with no
checkpoint failed or restart whereas, with async snapshots, it would have
already crashed 50 or so times.

Regards,
Federico

2017-09-30 19:01 GMT+02:00 Federico D'Ambrosio <
federico.dambro...@smartlab.ws>:

> Thank you very much, Gordon.
>
> I'll try to run the job without the asynchronous snapshots first thing.
>
> As for the Event data type: it's a case class with 2 fields: a String ID
> and a composite case class (let's call it RealEvent) containing 3 fields of
> the following types: Information, which is a case class with String fields,
> Coordinates, a nested case class with 2 Double and InstantValues, with 3
> Integers and a DateTime.This DateTime field in InstantValues is the one
> being evalued in the maxBy (via InstantValues and RealEvent compareTo
> implementations, because dot notation is not working in scala as of 1.3.2,
> FLINK-7629 <https://issues.apache.org/jira/browse/FLINK-7629>) and that
> was the reason in the first place I had to register the
> JodaDateTimeSerializer with Kryo.
>
> Regards,
> Federico
>
>
>
>
> 2017-09-30 18:08 GMT+02:00 Tzu-Li (Gordon) Tai :
>
>> Hi,
>>
>> Thanks for the extra info, it was helpful (I’m not sure why your first
>> logs didn’t have the full trace, though).
>>
>> I spent some time digging through the error trace, and currently have
>> some observations I would like to go through first:
>>
>> 1. So it seems like the ArrayIndexOutOfBoundsException was thrown while
>> trying to access the state and making a copy (via serialization) in the
>> CopyOnWriteStateTable.
>> 2. The state that caused the exception seems to be the state of the
>> reducing window function (i.e. the maxBy). The state type should be the
>> same as the records in your `events` DataStream, which seems to be a Scala
>> case class with some nested field that requires Kryo for serialization.
>> 3. Somehow Kryo failed with the ArrayIndexOutOfBoundsException when
>> trying to copy that field ..
>>
>> My current guess would perhaps be that the serializer internally used may
>> have been incorrectly shared, which is probably why this exception happens
>> randomly for you.
>> I recall that there were similar issues that occurred before due to the
>> fact that some KryoSerializers aren't thread-safe and was incorrectly
>> shared in Flink.
>>
>> I may need some help from you to be able to look at this a bit more:
>> - Is it possible that you disable asynchronous snapshots and try running
>> this job a bit more to see if the problem still occurs? This is mainly to
>> eliminate my guess on whether or not there is some incorrect serializer
>> usage in the CopyOnWriteStateTable.
>> - Could you let us know what your `events` DataStream records type case
>> class looks like?
>>
>> Also looping in Aljoscha and Stefan here, as they would probably have
>> more insights in this.
>>
>> Cheers,
>> Gordon
>>
>> On 30 September 2017 at 10:56:33 AM, Federico D'Ambrosio (
>> federico.dambro...@smartlab.ws) wrote:
>>
>> Hi Gordon,
>>
>> I remembered that I had already seen this kind of exception once during
>> the testing of the current job and fortunately I had the complete
>> stacktrace still saved on my pc:
>>
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>> at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
>> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali
>> zer.copy(KryoSerializer.java:176)
>> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.cop
>> y(CaseClassSerializer.scala:101)
>> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.cop
>> y(CaseClassSerializer.scala:32)
>> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.cop
>> y(CaseClassSerializer.scala:101)
>> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.cop
>> y(CaseClassSerializer.scala:32)
>> at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.
>> get(CopyOnWriteStateTable.java:279)
>> at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.
>> get(CopyOnWriteStateTable.java:296)
>> at org.apache.flink.runtime.state.heap.HeapReducingState.get(
>> HeapReducingState.java:68)
>> at org.apa

How to deal with blocking calls inside a Sink?

2017-10-02 Thread Federico D'Ambrosio
Hi, I've implemented a sink for Hive as a RichSinkFunction, but once I've
integrated it in my current flink job, I noticed that the processing of the
events slowed down really bad, I guess because of some blocking calls that
need to be when interacting with hive streaming api.

So, what can be done to make it so the throughput doesn't get hurt by these
calls? I guess increasing (by a lot) the parallelism of the sink operator
could be a solution, but I'd think it's not really a good one.

Maybe using the AsyncFunction API? Decoupling the sink in a buffer which
sends the data + operations to be made in the asyncInvoke method of the
AsyncFunction?

Any suggestion is appreciated.
Kind regards,
Federico D'Ambrosio


Re: ArrayIndexOutOfBoundExceptions while processing valve output watermark and while applying ReduceFunction in reducing state

2017-09-30 Thread Federico D'Ambrosio
Thank you very much, Gordon.

I'll try to run the job without the asynchronous snapshots first thing.

As for the Event data type: it's a case class with 2 fields: a String ID
and a composite case class (let's call it RealEvent) containing 3 fields of
the following types: Information, which is a case class with String fields,
Coordinates, a nested case class with 2 Double and InstantValues, with 3
Integers and a DateTime.This DateTime field in InstantValues is the one
being evalued in the maxBy (via InstantValues and RealEvent compareTo
implementations, because dot notation is not working in scala as of 1.3.2,
FLINK-7629 <https://issues.apache.org/jira/browse/FLINK-7629>) and that was
the reason in the first place I had to register the JodaDateTimeSerializer
with Kryo.

Regards,
Federico




2017-09-30 18:08 GMT+02:00 Tzu-Li (Gordon) Tai :

> Hi,
>
> Thanks for the extra info, it was helpful (I’m not sure why your first
> logs didn’t have the full trace, though).
>
> I spent some time digging through the error trace, and currently have some
> observations I would like to go through first:
>
> 1. So it seems like the ArrayIndexOutOfBoundsException was thrown while
> trying to access the state and making a copy (via serialization) in the
> CopyOnWriteStateTable.
> 2. The state that caused the exception seems to be the state of the
> reducing window function (i.e. the maxBy). The state type should be the
> same as the records in your `events` DataStream, which seems to be a Scala
> case class with some nested field that requires Kryo for serialization.
> 3. Somehow Kryo failed with the ArrayIndexOutOfBoundsException when trying
> to copy that field ..
>
> My current guess would perhaps be that the serializer internally used may
> have been incorrectly shared, which is probably why this exception happens
> randomly for you.
> I recall that there were similar issues that occurred before due to the
> fact that some KryoSerializers aren't thread-safe and was incorrectly
> shared in Flink.
>
> I may need some help from you to be able to look at this a bit more:
> - Is it possible that you disable asynchronous snapshots and try running
> this job a bit more to see if the problem still occurs? This is mainly to
> eliminate my guess on whether or not there is some incorrect serializer
> usage in the CopyOnWriteStateTable.
> - Could you let us know what your `events` DataStream records type case
> class looks like?
>
> Also looping in Aljoscha and Stefan here, as they would probably have more
> insights in this.
>
> Cheers,
> Gordon
>
> On 30 September 2017 at 10:56:33 AM, Federico D'Ambrosio (
> federico.dambro...@smartlab.ws) wrote:
>
> Hi Gordon,
>
> I remembered that I had already seen this kind of exception once during
> the testing of the current job and fortunately I had the complete
> stacktrace still saved on my pc:
>
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
> at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
> at org.apache.flink.api.java.typeutils.runtime.kryo.
> KryoSerializer.copy(KryoSerializer.java:176)
> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.
> copy(CaseClassSerializer.scala:101)
> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.
> copy(CaseClassSerializer.scala:32)
> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.
> copy(CaseClassSerializer.scala:101)
> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.
> copy(CaseClassSerializer.scala:32)
> at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(
> CopyOnWriteStateTable.java:279)
> at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(
> CopyOnWriteStateTable.java:296)
> at org.apache.flink.runtime.state.heap.HeapReducingState.
> get(HeapReducingState.java:68)
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.onEventTime(WindowOperator.java:498)
> at org.apache.flink.streaming.api.operators.
> HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:
> 275)
> at org.apache.flink.streaming.api.operators.
> InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.
> java:107)
> at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor$
> ForwardingValveOutputHandler.handleWatermark(
> StreamInputProcessor.java:286)
> a

Re: ArrayIndexOutOfBoundExceptions while processing valve output watermark and while applying ReduceFunction in reducing state

2017-09-30 Thread Federico D'Ambrosio
Hi Gordon,

I remembered that I had already seen this kind of exception once during the
testing of the current job and fortunately I had the complete stacktrace
still saved on my pc:

Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
at
org.apache.flink.runtime.state.heap.HeapReducingState.get(HeapReducingState.java:68)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:498)
at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)

I don't know why now the stacktrace is getting output only for the first
parts (handleWatermark and HeapReducingState).

So, it looks like something that has to do with the KryoSerializer. As a
KryoSerializer I'm using JodaDateTimeSerializer, registered as follows:

env.getConfig.addDefaultKryoSerializer(classOf[DateTime],
classOf[JodaDateTimeSerializer])

I hope this could help.

Regards,
Federico

2017-09-29 15:54 GMT+02:00 Federico D'Ambrosio <
federico.dambro...@smartlab.ws>:

> Hi Gordon,
>
> I'm currently using Flink 1.3.2 in local mode.
>
> If it's any help I realized from the log that the complete task which is
> failing is:
>
> 2017-09-29 14:17:20,354 INFO  org.apache.flink.runtime.
> taskmanager.Task - latest_time -> (map_active_stream,
> map_history_stream) (1/1) (5a6c9f187326f678701f939665db6685) switched
> from RUNNING to FAILED.
>
> val events = keyedStreamByID
>   .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>   .maxBy("time").name("latest_time").uid("latest_time")
>
>
> val activeStream = events
>   //Serialization to JsValue
>   .map(event => event.toMongoActiveJsValue).name("map_active_stream").uid(
> "map_active_stream")
>   //Global windowing, the cause of exception should be above
>   .timeWindowAll(Time.seconds(10))
>   .apply(new MongoWindow(MongoWritingType.UPDATE)).name("active_stream_
> window").uid("active_stream_window")
>
> val historyStream = airtrafficEvents
>   //Serialization to JsValue
>   .map(event => event.toMongoHistoryJsValue).name("map_history_stream").
> uid("map_history_stream")
>   //Global windowing, the cause of exception should be above
>   .timeWindowAll(Time.seconds(10))
>   .apply(new MongoWindow(MongoWritingType.UPDATE)).name("history_stream_
> window").uid("history_stream_window")
>
>
>
> Regards,
> Federico
>
> 2017-09-29 15:38 GMT+02:00 Tzu-Li (Gordon) Tai :
>
>> Hi,
>>
>> I’m looking into this. Coul

Re: ArrayIndexOutOfBoundExceptions while processing valve output watermark and while applying ReduceFunction in reducing state

2017-09-29 Thread Federico D'Ambrosio
Hi Gordon,

I'm currently using Flink 1.3.2 in local mode.

If it's any help I realized from the log that the complete task which is
failing is:

2017-09-29 14:17:20,354 INFO
org.apache.flink.runtime.taskmanager.Task - latest_time
-> (map_active_stream, map_history_stream) (1/1)
(5a6c9f187326f678701f939665db6685) switched from RUNNING to FAILED.

val events = keyedStreamByID
  .window(TumblingEventTimeWindows.of(Time.seconds(20)))
  .maxBy("time").name("latest_time").uid("latest_time")


val activeStream = events
  //Serialization to JsValue
  .map(event =>
event.toMongoActiveJsValue).name("map_active_stream").uid("map_active_stream")
  //Global windowing, the cause of exception should be above
  .timeWindowAll(Time.seconds(10))
  .apply(new
MongoWindow(MongoWritingType.UPDATE)).name("active_stream_window").uid("active_stream_window")

val historyStream = airtrafficEvents
  //Serialization to JsValue
  .map(event =>
event.toMongoHistoryJsValue).name("map_history_stream").uid("map_history_stream")
  //Global windowing, the cause of exception should be above
  .timeWindowAll(Time.seconds(10))
  .apply(new
MongoWindow(MongoWritingType.UPDATE)).name("history_stream_window").uid("history_stream_window")



Regards,
Federico

2017-09-29 15:38 GMT+02:00 Tzu-Li (Gordon) Tai :

> Hi,
>
> I’m looking into this. Could you let us know the Flink version in which
> the exceptions occurred?
>
> Cheers,
> Gordon
>
>
> On 29 September 2017 at 3:11:30 PM, Federico D'Ambrosio (
> federico.dambro...@smartlab.ws) wrote:
>
> Hi, I'm coming across these Exceptions while running a pretty simple flink 
> job.
>
> First one:
> java.lang.RuntimeException: Exception occurred while processing valve output 
> watermark:
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
> at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
> at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ArrayIndexOutOfBoundsException
>
> The second one:
> java.io.IOException: Exception while applying ReduceFunction in reducing state
> at 
> org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:82)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ArrayIndexOutOfBoundsException
>
>
> Since it looks like something is wrong in Watermark processing, in my case 
> Watermarks are generated in my KafkaSource:
>
> val stream = env.addSource(
>   new FlinkKafkaConsumer010[Event](topic, new JSONDeserializationSchema(), 
> consumerConfig)
> .setStartFromLatest()
> .assignTimestampsAndWatermarks(
>   new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) {
> def extractTimestamp(element: AirTrafficEvent): Long =
>   element.instantValues.time.getMillis
>   })
> )
>
> These exceptions aren't really that informative per se and, from what I
> see, the task triggering these exceptions is the following operator:
>
> val events = keyedStreamByID
>   .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>   .maxBy("timestamp").name("latest_time").uid("latest_time")
>
> What could be the problem here in your opinion? It's not emitting
> watermarks correctly? I'm not even how I could reproduce this exceptions,
> since it looks like they happen pretty much randomly.
>
> Thank you all,
> Federico D'Ambrosio
>
>


-- 
Federico D'Ambrosio


ArrayIndexOutOfBoundExceptions while processing valve output watermark and while applying ReduceFunction in reducing state

2017-09-29 Thread Federico D'Ambrosio
Hi, I'm coming across these Exceptions while running a pretty simple flink job.

First one:
java.lang.RuntimeException: Exception occurred while processing valve
output watermark:
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException

The second one:
java.io.IOException: Exception while applying ReduceFunction in reducing state
at 
org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:82)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException


Since it looks like something is wrong in Watermark processing, in my
case Watermarks are generated in my KafkaSource:

val stream = env.addSource(
  new FlinkKafkaConsumer010[Event](topic, new
JSONDeserializationSchema(), consumerConfig)
.setStartFromLatest()
.assignTimestampsAndWatermarks(
  new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) {
def extractTimestamp(element: AirTrafficEvent): Long =
  element.instantValues.time.getMillis
  })
)

These exceptions aren't really that informative per se and, from what I
see, the task triggering these exceptions is the following operator:

val events = keyedStreamByID
  .window(TumblingEventTimeWindows.of(Time.seconds(20)))
  .maxBy("timestamp").name("latest_time").uid("latest_time")

What could be the problem here in your opinion? It's not emitting
watermarks correctly? I'm not even how I could reproduce this exceptions,
since it looks like they happen pretty much randomly.

Thank you all,
Federico D'Ambrosio


Question about checkpointing with stateful operators and state recovery

2017-09-28 Thread Federico D'Ambrosio
Hi, I've got a couple of questions concerning the topics in the subject:

1. If an operator is getting applied on a keyed stream, do I still have
to implement the CheckpointedFunction trait and define the snapshotState
and initializeState methods, in order to successfully recover the state
from a job failure?

2. While using a FlinkKafkaConsumer, enabling checkpointing allows
exactly once semantics end to end, provided that the sink is able to
guarantee the same. Do I have to set
setCommitOffsetsOnCheckpoints(true)? How would someone implement exactly
once semantics in a sink?

3. What are the advantages of externalized checkpoints and which are
the cases where I would want to use them?

4. Let's suppose a scenario where: checkpointing is enabled every 10
seconds, I have a kafka consumer which is set to start from the latest
records, a sink providing at least once semantics and a stateful keyed
operator inbetween the consumer and the sink. Is it correct that, in case
of task failure, happens the following?
- the kafka consumer gets reverted to the latest offset (does it
happen even if I don't set setCommitOffsetsOnCheckpoints(true)?)
- the operator state gets reverted to the latest checkpoint
- the sink is stateless so it doesn't really care about what
happened
- the stream restarts and probably some of the events coming to the
sink have already been processed before

Thank you for attention,
Kind regards,
Federico


Re: Using HiveBolt from storm-hive with Flink-Storm compatibility wrapper

2017-09-25 Thread Federico D'Ambrosio
Hi Timo,

I didn't think about the jdbc-connector (I actually forgot that this was a
thing) and I'll surely look into it.
So far, I was trying to implement a simple sink (only for JSON so far)
starting from the base provided by the storm HiveBolt implementation, my
goal was to make use of Hive Streaming API.
I noticed that there are some potentially blocking calls in Hive API, for
example when a TransactionBatch is being committed or the
StreamingConnection is being closed, in your opinion what would be the best
way to deal with this kind of calls in Flink? Wrapping them in an
AsyncFunction? Simply spawning a new thread?

Kind regards,
Federico

2017-09-25 16:43 GMT+02:00 Timo Walther :

> Hi Federico,
>
> I think going through a Storm compatibility layer could work, but did you
> thought about using the flink-jdbc connector? That should be the easiest
> solution.
>
> Otherwise I think it would be easier to quickly implement your our
> SinkFunction. It is just one method that you have to implement, you could
> call some Hive commands there.
>
> Regards,
> Timo
>
>
> Am 9/25/17 um 4:16 PM schrieb Nico Kruber:
>
> Hi Federico,
>> I also did not find any implementation of a hive sink, nor much details
>> on this
>> topic in general. Let me forward this to Timo and Fabian (cc'd) who may
>> know
>> more.
>>
>> Nico
>>
>> On Friday, 22 September 2017 12:14:32 CEST Federico D'Ambrosio wrote:
>>
>>> Hello everyone,
>>>
>>> I'd like to use the HiveBolt from storm-hive inside a flink job using the
>>> Flink-Storm compatibility layer but I'm not sure how to integrate it. Let
>>> me explain, I would have the following:
>>>
>>> val mapper = ...
>>>
>>> val hiveOptions = ...
>>>
>>> streamByID
>>>.transform[OUT]("hive-sink", new BoltWrapper[IN, OUT](new
>>> HiveBolt(hiveOptions)))
>>>
>>> where streamByID is a DataStream[Event].
>>>
>>> What would be the IN and OUT types? HiveBolt executes on a storm Tuple,
>>> so,
>>> I'd think that In should be an Event "tuple-d" ( event => (field1,
>>> field2,
>>> field3 ...) ), while OUT, since I don't want the stream to keep flowing
>>> would be null or None?
>>>
>>> Alternatively, do you know any implementation of an hive sink in Flink?
>>> Other than the adaptation of the said HiveBolt in a RichSinkFunction?
>>>
>>> Thanks for your attention,
>>>   Federico
>>>
>>
>
>


-- 
Federico D'Ambrosio


Re: Unable to run flink job after adding a new dependency (cannot find Main-class)

2017-09-25 Thread Federico D'Ambrosio
As a little update, the pattern for the exclusion of those files in
sbt-assembly is the following:

assemblyMergeStrategy in assembly := {
  case PathList(ps @ _*) if ps.last.endsWith(".DSA") ||
ps.last.endsWith(".SF") || ps.last.endsWith(".RSA")  =>
MergeStrategy.discard
  //Other MergeStrategies
}

2017-09-25 11:48 GMT+02:00 Federico D'Ambrosio <
federico.dambro...@smartlab.ws>:

> Hi Urs,
>
> Thank you very much for your advice, I will look into excluding those
> files directly during the assembly.
>
> 2017-09-25 10:58 GMT+02:00 Urs Schoenenberger  com>:
>
>> Hi Federico,
>>
>> oh, I remember running into this problem some time ago. If I recall
>> correctly, this is not a flink issue, but an issue with technically
>> incorrect jars from dependencies which prevent the verification of the
>> manifest. I was using the maven-shade plugin back then and configured an
>> exclusion for these file types. I assume that sbt/sbt-assembly has a
>> similar option, this should be more stable than manually stripping the
>> jar.
>> Alternatively, you could try to find out which dependency puts the
>> .SF/etc files there and exclude this dependency altogether, it might be
>> a transitive lib dependency that comes with hadoop anyways, or simply
>> one that you don't need anyways.
>>
>> Best,
>> Urs
>>
>> On 25.09.2017 10:09, Federico D'Ambrosio wrote:
>> > Hi Urs,
>> >
>> > Yes the main class is set, just like you said.
>> >
>> > Still, I might have managed to get it working: during the assembly some
>> > .SF, .DSA and .RSA files are put inside the META-INF folder of the jar,
>> > possibly coming from some of the new dependencies in the deps tree.
>> > Apparently, this caused this weird issue. Using an appropriate pattern
>> for
>> > discarding the files during the assembly or removing them via zip -d
>> should
>> > be enough (I sure hope so, since this is some of the worst issues I've
>> come
>> > across).
>> >
>> >
>> > Federico D'Ambrosio
>> >
>> > Il 25 set 2017 9:51 AM, "Urs Schoenenberger" <
>> urs.schoenenber...@tngtech.com>
>> > ha scritto:
>> >
>> >> Hi Federico,
>> >>
>> >> just guessing, but are you explicitly setting the Main-Class manifest
>> >> attribute for the jar that you are building?
>> >>
>> >> Should be something like
>> >>
>> >> mainClass in (Compile, packageBin) :=
>> >> Some("org.yourorg.YourFlinkJobMainClass")
>> >>
>> >> Best,
>> >> Urs
>> >>
>> >>
>> >> On 23.09.2017 17:53, Federico D'Ambrosio wrote:
>> >>> Hello everyone,
>> >>>
>> >>> I'd like to submit to you this weird issue I'm having, hoping you
>> could
>> >>> help me.
>> >>> Premise: I'm using sbt 0.13.6 for building, scala 2.11.8 and flink
>> 1.3.2
>> >>> compiled from sources against hadoop 2.7.3.2.6.1.0-129 (HDP 2.6)
>> >>> So, I'm trying to implement an sink for Hive so I added the following
>> >>> dependency in my build.sbt:
>> >>>
>> >>> "org.apache.hive.hcatalog" % "hive-hcatalog-streaming" %
>> >>> "1.2.1000.2.6.1.0-129"
>> >>>
>> >>> in order to use hive streaming capabilities.
>> >>>
>> >>> After importing this dependency, not even using it, if I try to flink
>> run
>> >>> the job I get
>> >>>
>> >>> org.apache.flink.client.program.ProgramInvocationException: The
>> >> program's
>> >>> entry point class 'package.MainObj' was not found in the jar file.
>> >>>
>> >>> If I remove the dependency, everything goes back to normal.
>> >>> What is weird is that if I try to use sbt run in order to run job, *it
>> >> does
>> >>> find the Main class* and obviously crash because of the missing flink
>> >> core
>> >>> dependencies (AbstractStateBackend missing and whatnot).
>> >>>
>> >>> Here are the complete dependencies of the project:
>> >>>
>> >>> "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
>> >>> "org.apache.flink" %% "flink-streaming-scala&

Re: Unable to run flink job after adding a new dependency (cannot find Main-class)

2017-09-25 Thread Federico D'Ambrosio
Hi Urs,

Thank you very much for your advice, I will look into excluding those files
directly during the assembly.

2017-09-25 10:58 GMT+02:00 Urs Schoenenberger <
urs.schoenenber...@tngtech.com>:

> Hi Federico,
>
> oh, I remember running into this problem some time ago. If I recall
> correctly, this is not a flink issue, but an issue with technically
> incorrect jars from dependencies which prevent the verification of the
> manifest. I was using the maven-shade plugin back then and configured an
> exclusion for these file types. I assume that sbt/sbt-assembly has a
> similar option, this should be more stable than manually stripping the jar.
> Alternatively, you could try to find out which dependency puts the
> .SF/etc files there and exclude this dependency altogether, it might be
> a transitive lib dependency that comes with hadoop anyways, or simply
> one that you don't need anyways.
>
> Best,
> Urs
>
> On 25.09.2017 10:09, Federico D'Ambrosio wrote:
> > Hi Urs,
> >
> > Yes the main class is set, just like you said.
> >
> > Still, I might have managed to get it working: during the assembly some
> > .SF, .DSA and .RSA files are put inside the META-INF folder of the jar,
> > possibly coming from some of the new dependencies in the deps tree.
> > Apparently, this caused this weird issue. Using an appropriate pattern
> for
> > discarding the files during the assembly or removing them via zip -d
> should
> > be enough (I sure hope so, since this is some of the worst issues I've
> come
> > across).
> >
> >
> > Federico D'Ambrosio
> >
> > Il 25 set 2017 9:51 AM, "Urs Schoenenberger" <
> urs.schoenenber...@tngtech.com>
> > ha scritto:
> >
> >> Hi Federico,
> >>
> >> just guessing, but are you explicitly setting the Main-Class manifest
> >> attribute for the jar that you are building?
> >>
> >> Should be something like
> >>
> >> mainClass in (Compile, packageBin) :=
> >> Some("org.yourorg.YourFlinkJobMainClass")
> >>
> >> Best,
> >> Urs
> >>
> >>
> >> On 23.09.2017 17:53, Federico D'Ambrosio wrote:
> >>> Hello everyone,
> >>>
> >>> I'd like to submit to you this weird issue I'm having, hoping you could
> >>> help me.
> >>> Premise: I'm using sbt 0.13.6 for building, scala 2.11.8 and flink
> 1.3.2
> >>> compiled from sources against hadoop 2.7.3.2.6.1.0-129 (HDP 2.6)
> >>> So, I'm trying to implement an sink for Hive so I added the following
> >>> dependency in my build.sbt:
> >>>
> >>> "org.apache.hive.hcatalog" % "hive-hcatalog-streaming" %
> >>> "1.2.1000.2.6.1.0-129"
> >>>
> >>> in order to use hive streaming capabilities.
> >>>
> >>> After importing this dependency, not even using it, if I try to flink
> run
> >>> the job I get
> >>>
> >>> org.apache.flink.client.program.ProgramInvocationException: The
> >> program's
> >>> entry point class 'package.MainObj' was not found in the jar file.
> >>>
> >>> If I remove the dependency, everything goes back to normal.
> >>> What is weird is that if I try to use sbt run in order to run job, *it
> >> does
> >>> find the Main class* and obviously crash because of the missing flink
> >> core
> >>> dependencies (AbstractStateBackend missing and whatnot).
> >>>
> >>> Here are the complete dependencies of the project:
> >>>
> >>> "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
> >>> "org.apache.flink" %% "flink-streaming-scala" % flinkVersion %
> >> "provided",
> >>> "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
> >>> "org.apache.flink" %% "flink-cep-scala" % flinkVersion,
> >>> "org.apache.hive.hcatalog" % "hive-hcatalog-streaming" %
> >>> "1.2.1000.2.6.1.0-129",
> >>> "org.joda" % "joda-convert" % "1.8.3",
> >>> "com.typesafe.play" %% "play-json" % "2.6.2",
> >>> "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "2.0.2",
> >>> "org.scalactic" %% "scalactic" % "3.0.1",
> >>> &qu

Re: Unable to run flink job after adding a new dependency (cannot find Main-class)

2017-09-25 Thread Federico D'Ambrosio
Hi Urs,

Yes the main class is set, just like you said.

Still, I might have managed to get it working: during the assembly some
.SF, .DSA and .RSA files are put inside the META-INF folder of the jar,
possibly coming from some of the new dependencies in the deps tree.
Apparently, this caused this weird issue. Using an appropriate pattern for
discarding the files during the assembly or removing them via zip -d should
be enough (I sure hope so, since this is some of the worst issues I've come
across).


Federico D'Ambrosio

Il 25 set 2017 9:51 AM, "Urs Schoenenberger" 
ha scritto:

> Hi Federico,
>
> just guessing, but are you explicitly setting the Main-Class manifest
> attribute for the jar that you are building?
>
> Should be something like
>
> mainClass in (Compile, packageBin) :=
> Some("org.yourorg.YourFlinkJobMainClass")
>
> Best,
> Urs
>
>
> On 23.09.2017 17:53, Federico D'Ambrosio wrote:
> > Hello everyone,
> >
> > I'd like to submit to you this weird issue I'm having, hoping you could
> > help me.
> > Premise: I'm using sbt 0.13.6 for building, scala 2.11.8 and flink 1.3.2
> > compiled from sources against hadoop 2.7.3.2.6.1.0-129 (HDP 2.6)
> > So, I'm trying to implement an sink for Hive so I added the following
> > dependency in my build.sbt:
> >
> > "org.apache.hive.hcatalog" % "hive-hcatalog-streaming" %
> > "1.2.1000.2.6.1.0-129"
> >
> > in order to use hive streaming capabilities.
> >
> > After importing this dependency, not even using it, if I try to flink run
> > the job I get
> >
> > org.apache.flink.client.program.ProgramInvocationException: The
> program's
> > entry point class 'package.MainObj' was not found in the jar file.
> >
> > If I remove the dependency, everything goes back to normal.
> > What is weird is that if I try to use sbt run in order to run job, *it
> does
> > find the Main class* and obviously crash because of the missing flink
> core
> > dependencies (AbstractStateBackend missing and whatnot).
> >
> > Here are the complete dependencies of the project:
> >
> > "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
> > "org.apache.flink" %% "flink-streaming-scala" % flinkVersion %
> "provided",
> > "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
> > "org.apache.flink" %% "flink-cep-scala" % flinkVersion,
> > "org.apache.hive.hcatalog" % "hive-hcatalog-streaming" %
> > "1.2.1000.2.6.1.0-129",
> > "org.joda" % "joda-convert" % "1.8.3",
> > "com.typesafe.play" %% "play-json" % "2.6.2",
> > "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "2.0.2",
> > "org.scalactic" %% "scalactic" % "3.0.1",
> > "org.scalatest" %% "scalatest" % "3.0.1" % "test",
> > "de.javakaffee" % "kryo-serializers" % "0.42"
> >
> > Could it be an issue of dependencies conflicts between mongo-hadoop and
> > hive hadoop versions (respectively 2.7.1 and  2.7.3.2.6.1.0-129, even
> > though no issue between mongodb-hadoop and flink)? I'm even starting to
> > think that Flink cannot handle that well big jars (before the new
> > dependency it was 44M, afterwards it became 115M) when it comes to
> > classpath loading?
> >
> > Any help would be really appreciated,
> > Kind regards,
> > Federico
> >
> >
> >
> > Hello everyone,
> >
> > I'd like to submit to you this weird issue I'm having, hoping you could
> > help me.
> > Premise: I'm using sbt 0.13.6 for building, scala 2.11.8 and flink 1.3.2
> > compiled from sources against hadoop 2.7.3.2.6.1.0-129 (HDP 2.6)
> > So, I'm trying to implement an sink for Hive so I added the following
> > dependency in my build.sbt:
> >
> > "org.apache.hive.hcatalog" % "hive-hcatalog-streaming" %
> > "1.2.1000.2.6.1.0-129"
> >
> > in order to use hive streaming capabilities.
> >
> > After importing this dependency, not even using it, if I try to flink
> > run the job I get
> >
> > org.apache.flink.client.program.ProgramInvocationException: The
> > program's entry point class 'package.MainObj' was not found in the jar
> file.
> >
> > If I remove the dependency, everything g

Unable to run flink job after adding a new dependency (cannot find Main-class)

2017-09-23 Thread Federico D'Ambrosio
Hello everyone,

I'd like to submit to you this weird issue I'm having, hoping you could
help me.
Premise: I'm using sbt 0.13.6 for building, scala 2.11.8 and flink 1.3.2
compiled from sources against hadoop 2.7.3.2.6.1.0-129 (HDP 2.6)
So, I'm trying to implement an sink for Hive so I added the following
dependency in my build.sbt:

"org.apache.hive.hcatalog" % "hive-hcatalog-streaming" %
"1.2.1000.2.6.1.0-129"

in order to use hive streaming capabilities.

After importing this dependency, not even using it, if I try to flink run
the job I get

org.apache.flink.client.program.ProgramInvocationException: The program's
entry point class 'package.MainObj' was not found in the jar file.

If I remove the dependency, everything goes back to normal.
What is weird is that if I try to use sbt run in order to run job, *it does
find the Main class* and obviously crash because of the missing flink core
dependencies (AbstractStateBackend missing and whatnot).

Here are the complete dependencies of the project:

"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
"org.apache.flink" %% "flink-cep-scala" % flinkVersion,
"org.apache.hive.hcatalog" % "hive-hcatalog-streaming" %
"1.2.1000.2.6.1.0-129",
"org.joda" % "joda-convert" % "1.8.3",
"com.typesafe.play" %% "play-json" % "2.6.2",
"org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "2.0.2",
"org.scalactic" %% "scalactic" % "3.0.1",
"org.scalatest" %% "scalatest" % "3.0.1" % "test",
"de.javakaffee" % "kryo-serializers" % "0.42"

Could it be an issue of dependencies conflicts between mongo-hadoop and
hive hadoop versions (respectively 2.7.1 and  2.7.3.2.6.1.0-129, even
though no issue between mongodb-hadoop and flink)? I'm even starting to
think that Flink cannot handle that well big jars (before the new
dependency it was 44M, afterwards it became 115M) when it comes to
classpath loading?

Any help would be really appreciated,
Kind regards,
Federico


Using HiveBolt from storm-hive with Flink-Storm compatibility wrapper

2017-09-22 Thread Federico D'Ambrosio
Hello everyone,

I'd like to use the HiveBolt from storm-hive inside a flink job using the
Flink-Storm compatibility layer but I'm not sure how to integrate it. Let
me explain, I would have the following:

val mapper = ...

val hiveOptions = ...

streamByID
  .transform[OUT]("hive-sink", new BoltWrapper[IN, OUT](new
HiveBolt(hiveOptions)))

where streamByID is a DataStream[Event].

What would be the IN and OUT types? HiveBolt executes on a storm Tuple, so,
I'd think that In should be an Event "tuple-d" ( event => (field1, field2,
field3 ...) ), while OUT, since I don't want the stream to keep flowing
would be null or None?

Alternatively, do you know any implementation of an hive sink in Flink?
Other than the adaptation of the said HiveBolt in a RichSinkFunction?

Thanks for your attention,
 Federico


Re: [DISCUSS] Dropping Scala 2.10

2017-09-21 Thread Federico D'Ambrosio
Ok, thank you all for the clarification.

@Stephan: I'm using Kafka 0.10, I guess the problem I had then was actually
unrelated to specific Kafka version

Federico D'Ambrosio

Il 21 set 2017 16:30, "Stephan Ewen"  ha scritto:

> +1
>
> @ Frederico: I think Aljoscha is right - Flink only executes Kafka client
> code, which is Scala independent from 0.9 on. Do you use Kafka 0.8 still?
>
> On Wed, Sep 20, 2017 at 10:00 PM, Aljoscha Krettek 
> wrote:
>
>> Hi Federico,
>>
>> As far as I know, the Kafka client code has been rewritten in Java for
>> version 0.9, meaning there is no more Scala dependency in there. Only the
>> server (broker) code still contains Scala but it doesn't matter what Scala
>> version a client uses, if any.
>>
>> Best,
>> Aljoscha
>>
>> On 20. Sep 2017, at 14:32, Federico D'Ambrosio 
>> wrote:
>>
>> Hi, as far as I know some vendors like Hortonworks still use Kafka_2.10
>> as part of their hadoop distribution.
>> Could the use of a different scala version cause issues with the Kafka
>> connector? I'm asking because we are using HDP 2.6 and we once already had
>> some issue with conflicting scala versions concerning Kafka (though, we
>> were using Storm, I still haven't tested the Flink connector in this
>> context).
>>
>> Regards,
>> Federico
>>
>> 2017-09-20 14:19 GMT+02:00 Ted Yu :
>>
>>> +1
>>>
>>>  Original message 
>>> From: Hai Zhou 
>>> Date: 9/20/17 12:44 AM (GMT-08:00)
>>> To: Aljoscha Krettek , d...@flink.apache.org, user <
>>> user@flink.apache.org>
>>> Subject: Re: [DISCUSS] Dropping Scala 2.10
>>>
>>> +1
>>>
>>> > 在 2017年9月19日,17:56,Aljoscha Krettek  写道:
>>> >
>>> > Hi,
>>> >
>>> > Talking to some people I get the impression that Scala 2.10 is quite
>>> outdated by now. I would like to drop support for Scala 2.10 and my main
>>> motivation is that this would allow us to drop our custom Flakka build of
>>> Akka that we use because newer Akka versions only support Scala 2.11/2.12
>>> and we need a backported feature.
>>> >
>>> > Are there any concerns about this?
>>> >
>>> > Best,
>>> > Aljoscha
>>>
>>>
>>
>>
>> --
>> Federico D'Ambrosio
>>
>>
>>
>


Re: [DISCUSS] Dropping Scala 2.10

2017-09-20 Thread Federico D'Ambrosio
Hi, as far as I know some vendors like Hortonworks still use Kafka_2.10 as
part of their hadoop distribution.
Could the use of a different scala version cause issues with the Kafka
connector? I'm asking because we are using HDP 2.6 and we once already had
some issue with conflicting scala versions concerning Kafka (though, we
were using Storm, I still haven't tested the Flink connector in this
context).

Regards,
Federico

2017-09-20 14:19 GMT+02:00 Ted Yu :

> +1
>
>  Original message 
> From: Hai Zhou 
> Date: 9/20/17 12:44 AM (GMT-08:00)
> To: Aljoscha Krettek , d...@flink.apache.org, user <
> user@flink.apache.org>
> Subject: Re: [DISCUSS] Dropping Scala 2.10
>
> +1
>
> > 在 2017年9月19日,17:56,Aljoscha Krettek  写道:
> >
> > Hi,
> >
> > Talking to some people I get the impression that Scala 2.10 is quite
> outdated by now. I would like to drop support for Scala 2.10 and my main
> motivation is that this would allow us to drop our custom Flakka build of
> Akka that we use because newer Akka versions only support Scala 2.11/2.12
> and we need a backported feature.
> >
> > Are there any concerns about this?
> >
> > Best,
> > Aljoscha
>
>


-- 
Federico D'Ambrosio


Re: Dot notation not working for accessing case classes nested fields

2017-09-15 Thread Federico D'Ambrosio
Great, thanks!

The fact that it's actually written on the documentation is really
misleading.

Thank you very much for your response

Federico D'Ambrosio

Il 15 set 2017 13:26, "Gábor Gévay"  ha scritto:

> Hi Federico,
>
> Sorry, nested field expressions are not supported in these methods at
> the moment. I have created a JIRA issue for this:
> https://issues.apache.org/jira/browse/FLINK-7629
> I think this should be easy to fix, as all the infrastructure for
> supporting this is already in place. I'll try to do it over the
> weekend.
>
> Best,
> Gábor
>
>
>
>
> On Thu, Sep 14, 2017 at 3:51 PM, Federico D'Ambrosio
>  wrote:
> > Hi,
> >
> > I have the following case classes:
> >
> > case class Event(instantValues: InstantValues)
> > case class InstantValues(speed: Int, altitude: Int, time: DateTime)
> >
> >
> > in a DataStream[Event] I'd like to perform a maxBy operation on the field
> > time of instantValue for each event and according to the docs here it
> would
> > be possible to use the dot notation such the following:
> >
> > val events = stream
> >   .keyBy("otherField")
> >   .window(TumblingEventTimeWindows.of(Time.seconds(5)))
> >   .maxBy("instantValues.time")
> >
> > positionToMaxBy - In case of a POJO, Scala case class, or Tuple type, the
> > name of the public) field on which to perform the aggregation.
> Additionally,
> > a dot can be used to drill down into nested objects, as in
> "field1.fieldxy"
> > . Furthermore "*" can be specified in case of a basic type (which is
> > considered as having only one field).
> >
> >
> > Still, I'm getting the following error:
> >
> > Fields 'instantValues.time' are not valid for
> 'package.Event(instantValues:
> > package.InstantValues(speed: Integer, altitude: Integer, time:
> > GenericType))'
> >
> > whereas if, for instance, use only "instantValues" (while implementing
> its
> > compareTo method) the aggregation works as usual.
> >
> > Any idea as to why this isn't working? Am I doing something wrong?
> >
> > Thanks a lot,
> > Federico
>


Dot notation not working for accessing case classes nested fields

2017-09-14 Thread Federico D'Ambrosio
Hi,

I have the following case classes:

case class Event(instantValues: InstantValues)
case class InstantValues(speed: Int, altitude: Int, time: DateTime)


in a DataStream[Event] I'd like to perform a maxBy operation on the field
time of instantValue for each event and according to the docs here
it
would be possible to use the dot notation such the following:

val events = stream
  .keyBy("otherField")
  .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  .maxBy("instantValues.time")

positionToMaxBy - In case of a POJO, Scala case class, or Tuple type, the
name of the public) field on which to perform the aggregation.
Additionally, a dot can be used to drill down into nested objects, as
in "field1.fieldxy"
. Furthermore "*" can be specified in case of a basic type (which is
considered as having only one field).


Still, I'm getting the following error:

Fields 'instantValues.time' are not valid for 'package.Event(instantValues:
package.InstantValues(speed: Integer, altitude: Integer, time:
GenericType))'

whereas if, for instance, use only "instantValues" (while implementing its
compareTo method) the aggregation works as usual.

Any idea as to why this isn't working? Am I doing something wrong?

Thanks a lot,
Federico


Re: Is State access synchronized?

2017-09-11 Thread Federico D'Ambrosio
Hi,

Thank you very much, Chesnay, for this clarification.

2017-09-11 19:36 GMT+02:00 Chesnay Schepler :

> Hello,
>
> state is local to each parallel instance of an operator. Coupled with the
> fact that the "map" method is always called by the same thread (and never
> concurrently) the ValueState (or any state for that matter) will always
> return the latest values.
>
>
> On 10.09.2017 14:39, Federico D'Ambrosio wrote:
>
> Hi,
>
> as per the mail subject I wanted to ask you if a State access (read and
> write) is synchronized.
>
> I have the following stream:
>
> val airtrafficEvents = stream
> .keyBy(_.flightInfo.flight)
> .map(new UpdateIdFunction())
>
>
> where UpdateIdFunction is a RichMapFunction with a ValueState and a
> MapState, with the following map method
>
> def map(value: AirTrafficEvent): AirTrafficEventWithId = {
>
>   val flight = value.flightInfo.flight
>   val time = value.instantValues.time
>
>   AirTrafficEventWithId(value, createOrGetId(flight, time.getMillis))
>
> }
>
> private def createOrGetId(_key: String, _time: Long): Int = {
>
>   val tmpId = valuestate.value
>
>   //Remove from MapState entries older than one minute
>
>   val entry = Option[(Int, Long)](lookupMap.get(_key))
>
>   //update ValueState or MapState if needed
>
>   //return current updated ValueState or corresponding ID from updated
> MapState
>
> }
>
> So, I'm using the MapState to track the integer IDs of the events of the
> stream, retaining only the latest records inside the MapState, and I'm
> using the ValueState to generate an incremental integer ID for said events.
> Given all of this, I'm really not sure how the mapping is applied to the
> keyedstream in input: is it guaranteed that each time the method is called
> I'm getting the latest and updated value/map?
>
> Thank you for your attention,
> Federico
>
>
>


-- 
Federico D'Ambrosio


Is State access synchronized?

2017-09-10 Thread Federico D'Ambrosio
Hi,

as per the mail subject I wanted to ask you if a State access (read and
write) is synchronized.

I have the following stream:

val airtrafficEvents = stream
.keyBy(_.flightInfo.flight)
.map(new UpdateIdFunction())


where UpdateIdFunction is a RichMapFunction with a ValueState and a
MapState, with the following map method

def map(value: AirTrafficEvent): AirTrafficEventWithId = {

  val flight = value.flightInfo.flight
  val time = value.instantValues.time

  AirTrafficEventWithId(value, createOrGetId(flight, time.getMillis))

}

private def createOrGetId(_key: String, _time: Long): Int = {

  val tmpId = valuestate.value

  //Remove from MapState entries older than one minute

  val entry = Option[(Int, Long)](lookupMap.get(_key))

  //update ValueState or MapState if needed

  //return current updated ValueState or corresponding ID from updated
MapState

}

So, I'm using the MapState to track the integer IDs of the events of the
stream, retaining only the latest records inside the MapState, and I'm
using the ValueState to generate an incremental integer ID for said events.
Given all of this, I'm really not sure how the mapping is applied to the
keyedstream in input: is it guaranteed that each time the method is called
I'm getting the latest and updated value/map?

Thank you for your attention,
Federico


Re: BlobCache and its functioning

2017-08-31 Thread Federico D'Ambrosio
Ok, thank you very much!

So that was nothing actually related to what I was trying to do. I guess
I'll have to investigate further on the effective correctness of the
implementation of the OutputFormat then, because the total lack of other
log lines was the most strucking thing about this whole issue.

Thank you again

2017-08-31 12:24 GMT+02:00 Nico Kruber :

> to sum up: the lines you were seeing seem to be the down- and upload of the
> TaskManager logs from the web interface which go through the BlobServer and
> its components.
>
>
> Nico
>
> On Thursday, 31 August 2017 11:51:27 CEST Federico D'Ambrosio wrote:
> > Hi,
> >
> > 1) I'm using Flink 1.3.2
> >
> > 2) Th JobManager log is pretty much the same concerning those lines:
> >
> > 2017-08-30 14:16:53,343 INFO
> > org.apache.zookeeper.ClientCnxn   - Opening
> > socket connection to server master-1.localdomain/10.0.0.55:2181
> > 2017-08-30 14:16:53,344 INFO
> > org.apache.zookeeper.ClientCnxn   - Socket
> > connection established to master-1.localdomain/10.0.0.55:2181,
> initiating
> > session
> > 2017-08-30 14:16:53,348 INFO
> > org.apache.zookeeper.ClientCnxn   - Session
> > establishment complete on server master-1.localdomain/10.0.0.55:2181,
> > sessionid = 0x15e326a8fe6000e, negotiated timeout = 4
> > 2017-08-30 14:16:53,467 INFO
> > org.apache.flink.streaming.connectors.kinesis.internals.
> KinesisDataFetcher
> > - Subtask 0 has no active shards to read on startup; marking the subtask
> as
> > temporarily idle ...
> > 2017-08-30 14:16:53,469 INFO
> > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  -
> > Subtask 1 will be seeded with initial shard
> > StreamShardHandle{streamName='fdt', shard='{ShardId:
> > shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
> > 340282366920938463463374607431768211455},SequenceNumberRange:
> > {StartingSequenceNumber:
> > 49576318619583361934524589342704326364484033599642796034,}}'}, starting
> > state set as sequence number LATEST_SEQUENCE_NUM
> > 2017-08-30 14:16:53,470 INFO
> > org.apache.flink.streaming.connectors.kinesis.internals.
> KinesisDataFetcher
> > - Subtask 1 will start consuming seeded shard
> > StreamShardHandle{streamName='fdt', shard='{ShardId:
> > shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
> > 340282366920938463463374607431768211455},SequenceNumberRange:
> > {StartingSequenceNumber:
> > 49576318619583361934524589342704326364484033599642796034,}}'} from
> sequence
> > number LATEST_SEQUENCE_NUM with ShardConsumer 0
> > 2017-08-30 14:16:53,608 INFO
> > lab.vardata.HBaseBatchFormat  - Task 0:
> > Opening connection to currentDay to execute 1 tasks on Single Put job
> >
> > 2017-08-30 14:17:21,318 INFO  org.apache.flink.runtime.blob.
> > BlobCache   - Created BLOB cache storage directory
> > /tmp/blobStore-8a2a96af-b836-4c95-b79a-a4b80929126f
> > 2017-08-30 14:17:21,321 DEBUG org.apache.flink.runtime.blob.
> > BlobClient  - PUT content addressable BLOB stream to
> /
> > 127.0.0.1:59937
> > 2017-08-30 14:17:21,323 DEBUG org.apache.flink.runtime.blob.
> > BlobServerConnection- Received PUT request for content
> > addressable BLOB
> > 2017-08-30 14:17:21,324 INFO  org.apache.flink.runtime.blob.
> > BlobCache   - Downloading
> > 3ff486dff4c4eaafdab42b30a877326e62bfca82
> > from localhost/127.0.0.1:43268
> > 2017-08-30 14:17:21,324 DEBUG org.apache.flink.runtime.blob.
> > BlobClient  - GET content addressable BLOB
> > 3ff486dff4c4eaafdab42b30a877326e62bfca82 from /127.0.0.1:59938
> > 2017-08-30 14:18:13,708 DEBUG org.apache.flink.runtime.blob.
> > BlobClient  - PUT content addressable BLOB stream to
> /
> > 127.0.0.1:59976
> > 2017-08-30 14:18:13,708 DEBUG org.apache.flink.runtime.blob.
> > BlobServerConnection- Received PUT request for content
> > addressable BLOB
> > 2017-08-30 14:18:13,710 INFO  org.apache.flink.runtime.blob.
> > BlobCache   - Downloading
> > 2f5283326aab77faa047b705cd1d6470035b3b7d
> > from localhost/127.0.0.1:43268
> > 2017-08-30 14:18:13,710 DEBUG org.apache.flink.runtime.blob.
> > BlobClient  - GET content addressable BLOB
> > 2f5283326aab77faa047b705cd1d6470035b3b7d from /127.0.0.1:59978
> > 2017-08-3

Re: BlobCache and its functioning

2017-08-31 Thread Federico D'Ambrosio
127.0.0.1:43268
2017-08-30 15:15:24,638 DEBUG org.apache.flink.runtime.blob.
BlobClient  - GET content addressable BLOB
291f5aeb9306cb94098255237e01ab6735cf42ea from /127.0.0.1:33957
2017-08-30 15:21:04,189 DEBUG org.apache.flink.runtime.blob.
BlobClient  - PUT content addressable BLOB stream to /
127.0.0.1:34170
2017-08-30 15:21:04,189 DEBUG org.apache.flink.runtime.blob.
BlobServerConnection- Received PUT request for content
addressable BLOB
2017-08-30 15:21:04,191 INFO  org.apache.flink.runtime.blob.
BlobCache   - Downloading
e46b51d3bd0476b6a8a656469e7546cd933fa478
from localhost/127.0.0.1:43268
2017-08-30 15:21:04,191 DEBUG org.apache.flink.runtime.blob.
BlobClient  - GET content addressable BLOB
e46b51d3bd0476b6a8a656469e7546cd933fa478 from /127.0.0.1:34172
2017-08-30 15:22:10,141 DEBUG org.apache.flink.runtime.blob.
BlobClient  - PUT content addressable BLOB stream to /
127.0.0.1:34223
2017-08-30 15:22:10,142 DEBUG org.apache.flink.runtime.blob.
BlobServerConnection- Received PUT request for content
addressable BLOB
2017-08-30 15:22:10,144 INFO  org.apache.flink.runtime.blob.
BlobCache   - Downloading
cf9700032c229b39e634eeda73284e116314f7bb
from localhost/127.0.0.1:43268
2017-08-30 15:22:10,144 DEBUG org.apache.flink.runtime.blob.
BlobClient  - GET content addressable BLOB
cf9700032c229b39e634eeda73284e116314f7bb from /127.0.0.1:34225
2017-08-30 15:30:10,133 DEBUG org.apache.flink.runtime.blob.
BlobClient  - PUT content addressable BLOB stream to /
127.0.0.1:34544
2017-08-30 15:30:10,133 DEBUG org.apache.flink.runtime.blob.
BlobServerConnection- Received PUT request for content
addressable BLOB
2017-08-30 15:30:10,135 INFO  org.apache.flink.runtime.blob.
BlobCache   - Downloading
3b1d4b4d4820c2b74fd4a45d5741f2df940f5189
from localhost/127.0.0.1:43268
2017-08-30 15:30:10,136 DEBUG org.apache.flink.runtime.blob.
BlobClient  - GET content addressable BLOB
3b1d4b4d4820c2b74fd4a45d5741f2df940f5189 from /127.0.0.1:34546
2017-08-30 16:01:58,159 DEBUG org.apache.flink.runtime.blob.
BlobClient  - PUT content addressable BLOB stream to /
127.0.0.1:35741
2017-08-30 16:01:58,160 DEBUG org.apache.flink.runtime.blob.
BlobServerConnection- Received PUT request for content
addressable BLOB
2017-08-30 16:01:58,162 INFO  org.apache.flink.runtime.blob.
BlobCache   - Downloading
6bde2f7a709181065c6710c2252a5846f361ad68
from localhost/127.0.0.1:43268
2017-08-30 16:01:58,162 DEBUG org.apache.flink.runtime.blob.
BlobClient  - GET content addressable BLOB
6bde2f7a709181065c6710c2252a5846f361ad68 from /127.0.0.1:35743

3) There actually was CPU load, but I thought Flink was stuck in a loop or
something, because as you can see from the timestamp on the logs these
lines went on for a couple hours with no sign of actual writes on HBase
(before crashing for unrelated reasons, Kinesis stream was deleted). Since
I was writing on HBase, which I was accessing via Zookeeper, I was
expecting to see the Logs I put inside the writeRecord method of the
OutputFormat.
As for the jstack trace, I'm currently unable to provide one (because I
can't access the kinesis stream right now), but I'll try to emulate the
stream and provide that stack as soon as I can.

4) Yes, those were log lines from the TM log in the web dashboard

Thank you very much for your help


2017-08-31 10:29 GMT+02:00 Nico Kruber :

> Hi Federico,
> 1) Which version of Flink are you using?
> 2) Can you also share the JobManager log?
> 3) Why do you think, Flink is stuck at the BlobCache? Is it really
> blocked, or
> do you still have CPU load? Can you post stack traces of the TaskManager
> (TM)
> and JobManager processes when you think they are stuck (using jstack)?
> 4) These PUT requests in the TM logs are strange, unless you showed the TM
> logs in the web interface - did you?
>
>
> Nico
>
>
> On Thursday, 31 August 2017 09:45:59 CEST Fabian Hueske wrote:
> > Hi Federico,
> >
> > Not sure what's going on there but Nico (in CC) is more familiar with the
> > blob cache and might be able to help.
> >
> > Best, Fabian
> >
> > 2017-08-30 15:35 GMT+02:00 Federico D'Ambrosio :
> > > Hi,
> > >
> > > I have a rather simple Flink job which has a KinesisConsumer as a
> source
> > > and an HBase table as sink, in which I write using writeOutputFormat.
> I'm
> > > running it on a local machine with a single taskmanager (2 slots, 2G).
> The
> > > KinesisConsumer works fine and the connection to the HBase table gets
> > > opened fine (i.e. the open method of the class implementing
> OutputFo

BlobCache and its functioning

2017-08-30 Thread Federico D'Ambrosio
Hi,

I have a rather simple Flink job which has a KinesisConsumer as a source
and an HBase table as sink, in which I write using writeOutputFormat. I'm
running it on a local machine with a single taskmanager (2 slots, 2G). The
KinesisConsumer works fine and the connection to the HBase table gets
opened fine (i.e. the open method of the class implementing OutputFormat
gets actually called).

I'm running the job at a parallelism of 2, while the sink has a parallelism
of 1. The

Still, looking at the log I see that after opening the connection, the job
gets stuck at lines like this one:

INFO  org.apache.flink.runtime.blob.BlobCache   -
Downloading 8638bdf78b0e540786de6c291f710a8db447a2b4 from
localhost/127.0.0.1:43268

Each following one another, like this:

2017-08-30 14:17:21,318 INFO  org.apache.flink.runtime.blob.BlobCache
 - Created BLOB cache storage directory
/tmp/blobStore-8a2a96af-b836-4c95-b79a-a4b80929126f
2017-08-30 14:17:21,321 DEBUG org.apache.flink.runtime.blob.BlobClient
 - PUT content addressable BLOB stream to
/127.0.0.1:59937
2017-08-30 14:17:21,323 DEBUG
org.apache.flink.runtime.blob.BlobServerConnection-
Received PUT request for content addressable BLOB
2017-08-30 14:17:21,324 INFO  org.apache.flink.runtime.blob.BlobCache
 - Downloading
3ff486dff4c4eaafdab42b30a877326e62bfca82 from
localhost/127.0.0.1:43268
2017-08-30 14:17:21,324 DEBUG org.apache.flink.runtime.blob.BlobClient
 - GET content addressable BLOB
3ff486dff4c4eaafdab42b30a877326e62bfca82 from /127.0.0.1:59938
2017-08-30 14:18:13,708 DEBUG org.apache.flink.runtime.blob.BlobClient
 - PUT content addressable BLOB stream to
/127.0.0.1:59976
2017-08-30 14:18:13,708 DEBUG
org.apache.flink.runtime.blob.BlobServerConnection-
Received PUT request for content addressable BLOB
2017-08-30 14:18:13,710 INFO  org.apache.flink.runtime.blob.BlobCache
 - Downloading
2f5283326aab77faa047b705cd1d6470035b3b7d from
localhost/127.0.0.1:43268
2017-08-30 14:18:13,710 DEBUG org.apache.flink.runtime.blob.BlobClient
 - GET content addressable BLOB
2f5283326aab77faa047b705cd1d6470035b3b7d from /127.0.0.1:59978
2017-08-30 14:19:29,811 DEBUG org.apache.flink.runtime.blob.BlobClient
 - PUT content addressable BLOB stream to
/127.0.0.1:60022
2017-08-30 14:19:29,812 DEBUG
org.apache.flink.runtime.blob.BlobServerConnection-
Received PUT request for content addressable BLOB
2017-08-30 14:19:29,814 INFO  org.apache.flink.runtime.blob.BlobCache
 - Downloading
f91fd7ecec6f90809f52ee189cb48aa1e30b04f6 from
localhost/127.0.0.1:43268
2017-08-30 14:19:29,814 DEBUG org.apache.flink.runtime.blob.BlobClient
 - GET content addressable BLOB
f91fd7ecec6f90809f52ee189cb48aa1e30b04f6 from /127.0.0.1:60024
2017-08-30 14:21:42,856 DEBUG org.apache.flink.runtime.blob.BlobClient
 - PUT content addressable BLOB stream to
/127.0.0.1:60110
2017-08-30 14:21:42,856 DEBUG
org.apache.flink.runtime.blob.BlobServerConnection-
Received PUT request for content addressable BLOB
2017-08-30 14:21:42,858 INFO  org.apache.flink.runtime.blob.BlobCache
 - Downloading
8638bdf78b0e540786de6c291f710a8db447a2b4 from
localhost/127.0.0.1:43268
2017-08-30 14:21:42,859 DEBUG org.apache.flink.runtime.blob.BlobClient
 - GET content addressable BLOB
8638bdf78b0e540786de6c291f710a8db447a2b4 from /127.0.0.1:60112
2017-08-30 14:26:11,242 DEBUG org.apache.flink.runtime.blob.BlobClient
 - PUT content addressable BLOB stream to
/127.0.0.1:60295
2017-08-30 14:26:11,243 DEBUG
org.apache.flink.runtime.blob.BlobServerConnection-
Received PUT request for content addressable BLOB
2017-08-30 14:26:11,247 INFO  org.apache.flink.runtime.blob.BlobCache
 - Downloading
6d30c88539d511bb9acc13b53bb2a128614f5621 from
localhost/127.0.0.1:43268
2017-08-30 14:26:11,247 DEBUG org.apache.flink.runtime.blob.BlobClient
 - GET content addressable BLOB
6d30c88539d511bb9acc13b53bb2a128614f5621 from /127.0.0.1:60297
2017-08-30 14:29:20,942 DEBUG org.apache.flink.runtime.blob.BlobClient
 - PUT content addressable BLOB stream to
/127.0.0.1:60410


My questions are: what is the jobmanager doing here? Why is he taking ages
to do this? How do i speed up this behaviour?

Thank you very much for your attention,

Federico D'Ambrosio


Re: Flink session on Yarn - ClassNotFoundException

2017-08-30 Thread Federico D'Ambrosio
Hi,
What is your "hadoop version" output? I'm asking because you said your
hadoop distribution is in /usr/hdp so it looks like you're using
Hortonworks HDP, just like myself. So, this would be a third party
distribution and you'd need to build Flink from source according to this:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/building.html#vendor-specific-versions

Federico D'Ambrosio

Il 30 ago 2017 13:33, "albert"  ha scritto:

> Hi Chesnay,
>
> Thanks for your reply. I did download the binaries matching my Hadoop
> version (2.7), that's why I was wondering if the issue had something to do
> with the exact hadoop version flink is compiled again, or if there might be
> things that are missing in my environment.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: The implementation of the RichSinkFunction is not serializable.

2017-08-28 Thread Federico D'Ambrosio
Hello everyone,

I solved my issue by using an Array[Byte] as a parameter, instead of the
explicit HTableDescriptor parameter. This way I can instantiate the
TableDescriptor inside the open method of OutputFormat using the static
method HTableDescriptor.parseFrom. In the end, marking conf, table and
connection as transient wouldn't make any difference.

Regards

2017-08-27 14:22 GMT+02:00 Federico D'Ambrosio <
federico.dambro...@smartlab.ws>:

> Hi,
>
> could you elaborate, please? Marking conf, connection and table as
> transient wouldn't help because of the presence of the HTableDescriptor
> reference?
>
> 2017-08-27 12:44 GMT+02:00 Jörn Franke :
>
>> It looks like that in your case everything should be serializable. An
>> alternative would be to mark certain non-serializable things as transient,
>> but as far as I see this is not possible in your case.
>>
>> On 27. Aug 2017, at 11:02, Federico D'Ambrosio <
>> federico.dambro...@smartlab.ws> wrote:
>>
>> Hi,
>>
>> I'm trying to write on HBase using writeOutputFormat using a custom HBase
>> format inspired from this example
>> <https://github.com/apache/flink/blob/master/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java>
>> in flink-hbase (mind you, I'm using Scala instead of Java) and encountering
>> the error reported in the mail object.
>>
>> Now, the OutputFormat I'm using is the following:
>>
>> abstract class HBaseOutputFormat[T](tableDescriptor: HTableDescriptor, 
>> confPath : Path) extends OutputFormat[T]{
>>
>>   private val LOG = LoggerFactory.getLogger(this.getClass)
>>
>>   var conf : org.apache.hadoop.conf.Configuration = _
>>   var connection : Connection = _
>>   var table : Table = _
>>   var taskNumber : String = _
>>
>>   @throws[IOException]
>>   def configure(parameters: Configuration): Unit = {
>> conf = HBaseConfiguration.create()
>> conf.addResource(confPath.getPath)
>> connection = ConnectionFactory.createConnection(conf)
>>   }
>>
>>
>>   @throws[IOException]
>>   def close(): Unit = {
>> table.close()
>>
>>   }
>>
>>
>>   @throws[IOException]
>>   def open(taskNumber: Int, numTasks: Int): Unit = {
>> this.taskNumber = String.valueOf(taskNumber)
>> val admin = connection.getAdmin
>>
>> if(!admin.tableExists(tableDescriptor.getTableName))
>>   admin.createTable(tableDescriptor)
>>
>> table = connection.getTable(tableDescriptor.getTableName)
>>
>>   }
>> }
>>
>> which is inherited by the actual format used, that implements the 
>> writeRecord method
>>
>>
>> class HBaseBatchFormat(tableDescriptor: HTableDescriptor, confPath : Path)
>>   extends HBaseOutputFormat[BatchContainer](tableDescriptor, confPath)
>>
>> with BatchContainer being
>>
>> case class BatchContainer(batch: Iterable[(String, String, String, Int)]) 
>> extends Serializable
>>
>> I'd like to ask you: what needs to be Serializable? As far as I see,
>> conf, connection and table are not Serializable and so they are surely part
>> of the issue. Are the constructor parameters, especially tableDescriptor
>> which is not Serializable, to be considered in this case? Should all the
>> methods implemented from the OutputFormat interface contain only
>> Serializable variables?
>>
>> Thank you for you attention,
>> Federico
>>
>>
>


Re: The implementation of the RichSinkFunction is not serializable.

2017-08-27 Thread Federico D'Ambrosio
Hi,

could you elaborate, please? Marking conf, connection and table as
transient wouldn't help because of the presence of the HTableDescriptor
reference?

2017-08-27 12:44 GMT+02:00 Jörn Franke :

> It looks like that in your case everything should be serializable. An
> alternative would be to mark certain non-serializable things as transient,
> but as far as I see this is not possible in your case.
>
> On 27. Aug 2017, at 11:02, Federico D'Ambrosio <
> federico.dambro...@smartlab.ws> wrote:
>
> Hi,
>
> I'm trying to write on HBase using writeOutputFormat using a custom HBase
> format inspired from this example
> <https://github.com/apache/flink/blob/master/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java>
> in flink-hbase (mind you, I'm using Scala instead of Java) and encountering
> the error reported in the mail object.
>
> Now, the OutputFormat I'm using is the following:
>
> abstract class HBaseOutputFormat[T](tableDescriptor: HTableDescriptor, 
> confPath : Path) extends OutputFormat[T]{
>
>   private val LOG = LoggerFactory.getLogger(this.getClass)
>
>   var conf : org.apache.hadoop.conf.Configuration = _
>   var connection : Connection = _
>   var table : Table = _
>   var taskNumber : String = _
>
>   @throws[IOException]
>   def configure(parameters: Configuration): Unit = {
> conf = HBaseConfiguration.create()
> conf.addResource(confPath.getPath)
> connection = ConnectionFactory.createConnection(conf)
>   }
>
>
>   @throws[IOException]
>   def close(): Unit = {
> table.close()
>
>   }
>
>
>   @throws[IOException]
>   def open(taskNumber: Int, numTasks: Int): Unit = {
> this.taskNumber = String.valueOf(taskNumber)
> val admin = connection.getAdmin
>
> if(!admin.tableExists(tableDescriptor.getTableName))
>   admin.createTable(tableDescriptor)
>
> table = connection.getTable(tableDescriptor.getTableName)
>
>   }
> }
>
> which is inherited by the actual format used, that implements the writeRecord 
> method
>
>
> class HBaseBatchFormat(tableDescriptor: HTableDescriptor, confPath : Path)
>   extends HBaseOutputFormat[BatchContainer](tableDescriptor, confPath)
>
> with BatchContainer being
>
> case class BatchContainer(batch: Iterable[(String, String, String, Int)]) 
> extends Serializable
>
> I'd like to ask you: what needs to be Serializable? As far as I see, conf,
> connection and table are not Serializable and so they are surely part of
> the issue. Are the constructor parameters, especially tableDescriptor which
> is not Serializable, to be considered in this case? Should all the methods
> implemented from the OutputFormat interface contain only Serializable
> variables?
>
> Thank you for you attention,
> Federico
>
>


The implementation of the RichSinkFunction is not serializable.

2017-08-27 Thread Federico D'Ambrosio
Hi,

I'm trying to write on HBase using writeOutputFormat using a custom HBase
format inspired from this example

in flink-hbase (mind you, I'm using Scala instead of Java) and encountering
the error reported in the mail object.

Now, the OutputFormat I'm using is the following:

abstract class HBaseOutputFormat[T](tableDescriptor: HTableDescriptor,
confPath : Path) extends OutputFormat[T]{

  private val LOG = LoggerFactory.getLogger(this.getClass)

  var conf : org.apache.hadoop.conf.Configuration = _
  var connection : Connection = _
  var table : Table = _
  var taskNumber : String = _

  @throws[IOException]
  def configure(parameters: Configuration): Unit = {
conf = HBaseConfiguration.create()
conf.addResource(confPath.getPath)
connection = ConnectionFactory.createConnection(conf)
  }


  @throws[IOException]
  def close(): Unit = {
table.close()

  }


  @throws[IOException]
  def open(taskNumber: Int, numTasks: Int): Unit = {
this.taskNumber = String.valueOf(taskNumber)
val admin = connection.getAdmin

if(!admin.tableExists(tableDescriptor.getTableName))
  admin.createTable(tableDescriptor)

table = connection.getTable(tableDescriptor.getTableName)

  }
}

which is inherited by the actual format used, that implements the
writeRecord method


class HBaseBatchFormat(tableDescriptor: HTableDescriptor, confPath : Path)
  extends HBaseOutputFormat[BatchContainer](tableDescriptor, confPath)

with BatchContainer being

case class BatchContainer(batch: Iterable[(String, String, String,
Int)]) extends Serializable

I'd like to ask you: what needs to be Serializable? As far as I see, conf,
connection and table are not Serializable and so they are surely part of
the issue. Are the constructor parameters, especially tableDescriptor which
is not Serializable, to be considered in this case? Should all the methods
implemented from the OutputFormat interface contain only Serializable
variables?

Thank you for you attention,
Federico


Re: Possible conflict between in Flink connectors

2017-08-25 Thread Federico D'Ambrosio
Hi,

At the beginning, I was wondering myself that too, and I don't know why
hbase-common wasn''t being downloaded and included, so I added it
explicitly.

I was in the process to write that maybe I've solved this weird issue:
apparently the shading worked and the ClassDefNotFound issue was caused by
the missing hbase-client jar inside the fat jar, even though there should
have been! So, I added again the explicit dependency for it in the
build.sbt and now it's working.


2017-08-25 23:03 GMT+02:00 Robert Metzger :

> Hi,
>
> why do you need to add hbase-common as a separate dependency? Doesn't the
> "flink-hbase" dependency transitively pull in hbase?
>
> On Fri, Aug 25, 2017 at 6:35 PM, Ted Yu  wrote:
>
>> If Guava 18.0 is used to build hbase 1.3, there would be compilation
>> errors such as the following:
>>
>> [ERROR] /mnt/disk2/a/1.3-h/hbase-server/src/main/java/org/apache/had
>> oop/hbase/replication/regionserver/ReplicationSource.java:[271,25]
>> error: cannot find symbol
>> [ERROR]   symbol:   method stopAndWait()
>> [ERROR]   location: variable replicationEndpoint of type
>> ReplicationEndpoint
>> [ERROR] /mnt/disk2/a/1.3-h/hbase-server/src/main/java/org/apache/had
>> oop/hbase/replication/regionserver/ReplicationSource.java:[281,47]
>> error: cannot find symbol
>> [ERROR]   symbol:   method start()
>> [ERROR]   location: variable replicationEndpoint of type
>> ReplicationEndpoint
>>
>> Maybe you can shade the guava dependency in hbase 1.3
>>
>> In the upcoming hbase 2.0 release, third party dependencies such as guava
>> and netty are shaded. Meaning there wouldn't be such conflict with other
>> components you may use.
>>
>> On Fri, Aug 25, 2017 at 8:36 AM, Federico D'Ambrosio <
>> federico.dambro...@smartlab.ws> wrote:
>>
>>> Hello everyone, I'm new to Flink and am encountering a nasty problem
>>> while trying to submit a streaming Flink Job. I'll try to explain it as
>>> thoroughly as possible.
>>>
>>> Premise: I'm using an HDP 2.6 hadoop cluster, with hadoop version
>>> 2.7.3.2.6.1.0-129, Flink compiled from sources accordingly (maven 3.0.5) as
>>> per documentation
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/building.html#vendor-specific-versions>
>>> and I submit the job using yarn-session.sh and then flink run.
>>>
>>> Getting into more details, the Flink Job is a fat jar built with sbt
>>> assembly and the following dependencies:
>>>
>>> libraryDependencies += "org.apache.flink" %% "flink-scala" % flinkVersion % 
>>> "provided"
>>> libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % 
>>> flinkVersion % "provided"
>>> libraryDependencies += "org.apache.flink" %% "flink-connector-kinesis" % 
>>> flinkVersion % "provided"
>>> libraryDependencies += "org.apache.flink" %% "flink-hbase" % flinkVersion % 
>>> "provided"
>>> libraryDependencies += "org.apache.flink" %% "flink-connector-filesystem" % 
>>> flinkVersion % "provided"
>>> libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.3.0"
>>> libraryDependencies += "org.joda" % "joda-convert" % "1.8.3"
>>> libraryDependencies += "com.typesafe.play" %% "play-json" % "2.6.2"
>>>
>>>
>>> assemblyOption in assembly := (assemblyOption in 
>>> assembly).value.copy(includeScala = false)
>>>
>>>
>>> inside the flink lib folder I have the following Jars:
>>>
>>> flink-connector-filesystem_2.10-1.3.2.jar
>>> flink-python_2.10-1.3.2.jar
>>> flink-connector-kinesis_2.10-1.3.2.jar
>>> flink-shaded-hadoop2-uber-1.3.2.jar
>>> flink-dist_2.10-1.3.2.jar
>>> log4j-1.2.17.jar
>>> flink-hbase_2.10-1.3.2.jar
>>> slf4j-log4j12-1.7.7.jar
>>>
>>> Right now, the issue is that when I flink run the job, I get
>>> NoClassDefFound for org/apache/hadoop/hbase/client/Put class, despite
>>> it being inside the fat jar. So, I tried putting both hbase-common.jar and
>>> hbase-client.jars inside the lib folder, getting as a result another
>>> NoClassDefFound, only, this time, for com/google/common/collect/List
>>> Multimap. Now, I noticed that flink-connector-kinesis and all the hba

Possible conflict between in Flink connectors

2017-08-25 Thread Federico D'Ambrosio
 +- org.apache.hbase:hbase-protocol:1.3.0
(depends on 4.12)
[warn] +- org.apache.hbase:hbase-annotations:1.3.0
(depends on 4.12)
[warn] +- org.apache.hbase:hbase-prefix-tree:1.3.0
(depends on 4.12)
[warn] +- org.apache.hbase:hbase-procedure:1.3.0
(depends on 4.12)
[warn] +- org.apache.hbase:hbase-client:1.3.0
(depends on 4.12)
[warn]     +- org.apache.hbase:hbase-common:1.3.0
(depends on 4.12)
[warn] +- org.apache.hbase:hbase-server:1.3.0
(depends on 4.12)
[warn] +- jline:jline:0.9.94
(depends on 3.8.1)

And here I am asking for help here.

Thank you very much for your attention,
Kind regards,

Federico D'Ambrosio