flink one transformation end,the next transformation start

2017-03-30 Thread rimin515
hi,all,i run a job,it is 
:-val data = 
env.readTextFile("hdfs:///")//DataSet[(String,Array[String])]val dataVec = 
computeDataVect(data)//DataSet[(String,Int,Array[(Int,Double)])]val rescomm = 
computeCosSims (dataVec)//DataSet[(String,Array[(String,Double)])]
but when run on the yarn cluster,the result was error,the job can success;and 
run on the local,in eclipse on my computer,the result is correct.
so,i run twice,first:val data = 
env.readTextFile("hdfs:///")//DataSet[(String,Array[String])]val dataVec = 
computeDataVect(data)//DataSet[(String,Int,Array[(Int,Double)])]dataVec.writeAsText("hdfs///vec")//the
 vector is correct,
second:val readVec = 
env.readTextFile("hdfs:///vec").map(...)//DataSet[(String,Int,Array[(Int,Double)])]val
 rescomm = computeCosSims 
(dataVec)//DataSet[(String,Array[(String,Double)])]and the result is correct,is 
the same as on local,in eclispe.--someone can 
solve the problem?


Re: 20 times higher throughput with Window function vs fold function, intended?

2017-03-30 Thread Kamil Dziublinski
Hey guys,

Sorry for confusion it turned out that I had a bug in my code, when I was
not clearing this list in my batch object on each apply call. Forgot it has
to be added since its different than fold.
Which led to so high throughput. When I fixed this I was back to 160k per
sec. I'm still investigating how I can speed it up.

As a side note its quite interesting that hbase was able to do 2millions
puts per second. But most of them were already stored with previous call so
perhaps internally he is able to distinguish in memory if a put was stored
or not. Not sure.

Anyway my claim about window vs fold performance difference was wrong. So
forget about it ;)

On Wed, Mar 29, 2017 at 12:21 PM, Timo Walther  wrote:

> Hi Kamil,
>
> the performance implications might be the result of which state the
> underlying functions are using internally. WindowFunctions use ListState or
> ReducingState, fold() uses FoldingState. It also depends on the size of
> your state and the state backend you are using. I recommend the following
> documentation page. The FoldingState might be deprecated soon, once a
> better alternative is available: https://ci.apache.org/
> projects/flink/flink-docs-release-1.2/dev/stream/state.
> html#using-managed-keyed-state
>
> I hope that helps.
>
> Regards,
> Timo
>
> Am 29/03/17 um 11:27 schrieb Kamil Dziublinski:
>
> Hi guys,
>
> I’m using flink on production in Mapp. We recently swapped from storm.
> Before I have put this live I was doing performance tests and I found
> something that “feels” a bit off.
> I have a simple streaming job reading from kafka, doing window for 3
> seconds and then storing into hbase.
>
> Initially we had this second step written with a fold function, since I
> thought performance and resource wise it’s a better idea.
> But I couldn’t reach more than 120k writes per second to HBase and I
> thought hbase sink is a bottlenck here. But then I tried doing the same
> with window function and my performance jumped to 2 millions writes per
> second. Just wow :) Comparing to storm where I had max 320k per second it
> is amazing.
>
> Both fold and window functions were doing the same thing, taking together
> all the records for the same tenant and user (key by is used for that) and
> putting it in one batched object with arraylists for the mutations on user
> profile. After that passing this object to the sink. I can post the code if
> its needed.
>
> In case of fold I was just adding profile mutation to the list and in case
> of window function iterating over all of it and returning this batched
> entity in one go.
>
> I’m wondering if this is expected to have 20 times slower performance just
> by using fold function. I would like to know what is so costly about this,
> as intuitively I would expect fold function being a better choice here
> since I assume that window function is using more memory for buffering.
>
> Also my colleagues when they were doing PoC on flink evaluation they were
> seeing very similar results to what I am seeing now. But they were still
> using fold function. This was on flink version 1.0.3 and now I am using
> 1.2.0. So perhaps there is some regression?
>
> Please let me know what you think.
>
> Cheers,
> Kamil.
>
>
>


Re: 20 times higher throughput with Window function vs fold function, intended?

2017-03-30 Thread Ted Yu
Kamil:
In the upcoming hbase 2.0 release, there are more write path optimizations 
which would boost write performance further. 

FYI 

> On Mar 30, 2017, at 1:07 AM, Kamil Dziublinski  
> wrote:
> 
> Hey guys,
> 
> Sorry for confusion it turned out that I had a bug in my code, when I was not 
> clearing this list in my batch object on each apply call. Forgot it has to be 
> added since its different than fold.
> Which led to so high throughput. When I fixed this I was back to 160k per 
> sec. I'm still investigating how I can speed it up.
> 
> As a side note its quite interesting that hbase was able to do 2millions puts 
> per second. But most of them were already stored with previous call so 
> perhaps internally he is able to distinguish in memory if a put was stored or 
> not. Not sure.
> 
> Anyway my claim about window vs fold performance difference was wrong. So 
> forget about it ;)
> 
>> On Wed, Mar 29, 2017 at 12:21 PM, Timo Walther  wrote:
>> Hi Kamil,
>> 
>> the performance implications might be the result of which state the 
>> underlying functions are using internally. WindowFunctions use ListState or 
>> ReducingState, fold() uses FoldingState. It also depends on the size of your 
>> state and the state backend you are using. I recommend the following 
>> documentation page. The FoldingState might be deprecated soon, once a better 
>> alternative is available: 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/state.html#using-managed-keyed-state
>> 
>> I hope that helps.
>> 
>> Regards,
>> Timo
>> 
>>> Am 29/03/17 um 11:27 schrieb Kamil Dziublinski:
>>> Hi guys,
>>> 
>>> I’m using flink on production in Mapp. We recently swapped from storm.
>>> Before I have put this live I was doing performance tests and I found 
>>> something that “feels” a bit off.
>>> I have a simple streaming job reading from kafka, doing window for 3 
>>> seconds and then storing into hbase.
>>> 
>>> Initially we had this second step written with a fold function, since I 
>>> thought performance and resource wise it’s a better idea. 
>>> But I couldn’t reach more than 120k writes per second to HBase and I 
>>> thought hbase sink is a bottlenck here. But then I tried doing the same 
>>> with window function and my performance jumped to 2 millions writes per 
>>> second. Just wow :) Comparing to storm where I had max 320k per second it 
>>> is amazing.
>>> 
>>> Both fold and window functions were doing the same thing, taking together 
>>> all the records for the same tenant and user (key by is used for that) and 
>>> putting it in one batched object with arraylists for the mutations on user 
>>> profile. After that passing this object to the sink. I can post the code if 
>>> its needed. 
>>> 
>>> In case of fold I was just adding profile mutation to the list and in case 
>>> of window function iterating over all of it and returning this batched 
>>> entity in one go.
>>> 
>>> I’m wondering if this is expected to have 20 times slower performance just 
>>> by using fold function. I would like to know what is so costly about this, 
>>> as intuitively I would expect fold function being a better choice here 
>>> since I assume that window function is using more memory for buffering.
>>> 
>>> Also my colleagues when they were doing PoC on flink evaluation they were 
>>> seeing very similar results to what I am seeing now. But they were still 
>>> using fold function. This was on flink version 1.0.3 and now I am using 
>>> 1.2.0. So perhaps there is some regression?
>>> 
>>> Please let me know what you think.
>>> 
>>> Cheers,
>>> Kamil.
> 


Re: 20 times higher throughput with Window function vs fold function, intended?

2017-03-30 Thread Kamil Dziublinski
Thanks Ted, will read about it.

While we are on throughput.
Do you guys have any suggestion on how to optimise kafka reading from
flink?
In my current setup:
Flink is on 15 machines on yarn
Kafka on 9 brokers with 40 partitions. Source parallelism is 40 for flink,
And just for testing I left only filters there without sink to see max
throughput.
I am getting max 800-900k per sec. And definitely not utilising 1gb/s
network. Im more or less utilising only 20-30% of network bandwith.

I'm wondering what I can tweak further to increase this. I was reading in
this blog:
https://data-artisans.com/extending-the-yahoo-streaming-benchmark/
about 3 millions per sec with only 20 partitions. So i'm sure I should be
able to squeeze out more out of it.

On Thu, Mar 30, 2017 at 11:51 AM, Ted Yu  wrote:

> Kamil:
> In the upcoming hbase 2.0 release, there are more write path optimizations
> which would boost write performance further.
>
> FYI
>
> On Mar 30, 2017, at 1:07 AM, Kamil Dziublinski <
> kamil.dziublin...@gmail.com> wrote:
>
> Hey guys,
>
> Sorry for confusion it turned out that I had a bug in my code, when I was
> not clearing this list in my batch object on each apply call. Forgot it has
> to be added since its different than fold.
> Which led to so high throughput. When I fixed this I was back to 160k per
> sec. I'm still investigating how I can speed it up.
>
> As a side note its quite interesting that hbase was able to do 2millions
> puts per second. But most of them were already stored with previous call so
> perhaps internally he is able to distinguish in memory if a put was stored
> or not. Not sure.
>
> Anyway my claim about window vs fold performance difference was wrong. So
> forget about it ;)
>
> On Wed, Mar 29, 2017 at 12:21 PM, Timo Walther  wrote:
>
>> Hi Kamil,
>>
>> the performance implications might be the result of which state the
>> underlying functions are using internally. WindowFunctions use ListState or
>> ReducingState, fold() uses FoldingState. It also depends on the size of
>> your state and the state backend you are using. I recommend the following
>> documentation page. The FoldingState might be deprecated soon, once a
>> better alternative is available: https://ci.apache.org/projects
>> /flink/flink-docs-release-1.2/dev/stream/state.html#using-
>> managed-keyed-state
>>
>> I hope that helps.
>>
>> Regards,
>> Timo
>>
>> Am 29/03/17 um 11:27 schrieb Kamil Dziublinski:
>>
>> Hi guys,
>>
>> I’m using flink on production in Mapp. We recently swapped from storm.
>> Before I have put this live I was doing performance tests and I found
>> something that “feels” a bit off.
>> I have a simple streaming job reading from kafka, doing window for 3
>> seconds and then storing into hbase.
>>
>> Initially we had this second step written with a fold function, since I
>> thought performance and resource wise it’s a better idea.
>> But I couldn’t reach more than 120k writes per second to HBase and I
>> thought hbase sink is a bottlenck here. But then I tried doing the same
>> with window function and my performance jumped to 2 millions writes per
>> second. Just wow :) Comparing to storm where I had max 320k per second it
>> is amazing.
>>
>> Both fold and window functions were doing the same thing, taking together
>> all the records for the same tenant and user (key by is used for that) and
>> putting it in one batched object with arraylists for the mutations on user
>> profile. After that passing this object to the sink. I can post the code if
>> its needed.
>>
>> In case of fold I was just adding profile mutation to the list and in
>> case of window function iterating over all of it and returning this batched
>> entity in one go.
>>
>> I’m wondering if this is expected to have 20 times slower performance
>> just by using fold function. I would like to know what is so costly about
>> this, as intuitively I would expect fold function being a better choice
>> here since I assume that window function is using more memory for buffering.
>>
>> Also my colleagues when they were doing PoC on flink evaluation they were
>> seeing very similar results to what I am seeing now. But they were still
>> using fold function. This was on flink version 1.0.3 and now I am using
>> 1.2.0. So perhaps there is some regression?
>>
>> Please let me know what you think.
>>
>> Cheers,
>> Kamil.
>>
>>
>>
>


Re: 20 times higher throughput with Window function vs fold function, intended?

2017-03-30 Thread Tzu-Li (Gordon) Tai
I'm wondering what I can tweak further to increase this. I was reading in this 
blog: https://data-artisans.com/extending-the-yahoo-streaming-benchmark/
about 3 millions per sec with only 20 partitions. So i'm sure I should be able 
to squeeze out more out of it.

Not really sure if it is relevant under the context of your case, but you could 
perhaps try tweaking the maximum size of Kafka records fetched on each poll on 
the partitions.
You can do this by setting a higher value for “max.partition.fetch.bytes” in 
the provided config properties when instantiating the consumer; that will 
directly configure the internal Kafka clients.
Generally, all Kafka settings are applicable through the provided config 
properties, so you can perhaps take a look at the Kafka docs to see what else 
there is to tune for the clients.

On March 30, 2017 at 6:11:27 PM, Kamil Dziublinski 
(kamil.dziublin...@gmail.com) wrote:

I'm wondering what I can tweak further to increase this. I was reading in this 
blog: https://data-artisans.com/extending-the-yahoo-streaming-benchmark/
about 3 millions per sec with only 20 partitions. So i'm sure I should be able 
to squeeze out more out of it.

Re: Flink 1.2 time window operation

2017-03-30 Thread Tzu-Li (Gordon) Tai
Hi,

Thanks for the clarification.

What are the reasons behind consuming/producing messages from/to Kafka while 
the window has not expired yet?
First, some remarks here -  sources (in your case the Kafka consumer) will not 
stop fetching / producing data when the windows haven’t fired yet. Does this 
explain what you have plotted in the diagram you attached (sorry, I can’t 
really reason about the diagram because I’m not so sure what the values of the 
x-y axes represent)?

If you’re writing the outputs of the window operation to Kafka (by adding a 
Kafka sink after the windowing), then yes it should only write to Kafka when 
the window has fired. The characteristics will also differ for different types 
of windows, so you should definitely take a look at the Windowing docs [1] 
about them.

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-assigners
On March 30, 2017 at 2:37:41 PM, Dominik Safaric (dominiksafa...@gmail.com) 
wrote:

What are the reasons behind consuming/producing messages from/to Kafka while 
the window has not expired yet?

OOM on flink when job restarts a lot

2017-03-30 Thread Frits Jalvingh
Hello List,

We have a Flink job running reading a Kafka topic, then sending all
messages with a SOAP call. We have had a situation where that SOAP call
failed every time, causing the job to be RESTARTING every few seconds.

After a few hours Flink itself terminates with an OutOfMemoryError. This
means that all flink jobs are now in trouble.

I dumped the heap, and noticed that it was completely filled up with two
things:
- kafka metrics
- HashMap nodes related to PublicSuffixMatcher, a part of Apache HttpClient.

This leads me to believe that the restarting somehow retains references to
some old failed classes/classloaders?

Of course I will repair the root cause, the failing job, but I would also
like to fix things so that Flink does not die when something like this
happens. I can of course set things like the max number of retries but I do
not like that: I rather have the thing retry indefinitely so that when
stuff is repaired the job continues normally.

I tried to find information about how Flink loads jobs but I could not make
much of it.

How can I ensure that Flink does not run out of memory like this?

We're using Flink 1.1.1 and Kafka 0.9.0.1.

Thanks for your time,

Frits


Re: deploying flink in AWS - some teething issues

2017-03-30 Thread Chakravarthy varaga
Hi,

With regards to logging (both Flink & application specific logs) within
the container, are there best practices that you know of to get the logs to
a centralized locations..
For e.g. the flink TM's log are local inside the container and I don't
wish to write to shared/mounted volumes, this means that I have to run
separate daemon running inside the container along with the TM to transport
these logs to another server or so...
+ I don't see that flink provides support for syslog to be able to
connect to rsyslog etc.,

Can you please advice a way to go here?

Best Regards
CVP

On Wed, Mar 29, 2017 at 11:33 AM, Patrick Lucas 
wrote:

> For 1., I think the standard approach would be to specify from without
> what the heap size should be. If you want an *x* MB heap, you could set
> your container memory limit to 1.3 * *x* or so (to account for overhead)
> and set taskmanager.heap.mb: *x* in your config.
>
> The other way around—e.g. from inside the container determine its memory
> limit and divide it by 1.3—sounds interesting though, so please share if
> you have success with that.
>
> For 2. I don't think there's really a good way yet to monitor the health
> of containerized jobs directly, so probably your best bet is to watch the
> job's metrics from outside the Flink cluster.
>
> --
> Patrick Lucas
>
> On Wed, Mar 29, 2017 at 10:58 AM, Chakravarthy varaga <
> chakravarth...@gmail.com> wrote:
>
>> Hi,
>>
>> Any updates here? I'm sure many would have faced similar issues like
>> these, any help here is highly appreciated.
>>
>> Best Regards
>> CVP
>>
>> On Tue, Mar 28, 2017 at 5:47 PM, Chakravarthy varaga <
>> chakravarth...@gmail.com> wrote:
>>
>>> Hi Team,
>>>
>>>If the flink cluster is conainerized and managed through by a
>>> container orchestrator,
>>>
>>> 1.  the orchestrator allocates resources for each JM. TM etc., say
>>> if the container (TM) needs to run with 2G RAM, how should this allocation
>>> be honoured by the TM when its JVM starts. I'm thinking of wrapping up a
>>> script that determines the resource allocation for the container and writes
>>> the flink-conf.yaml before the TM starts the process. Is this the way to go?
>>>
>>> 2. The container orchestrator looks at health of the containers and
>>> is however unaware of the job health status runnning inside the
>>> container/cluster. How should this be determined?
>>>
>>> Best Regards
>>> CVP
>>>
>>>
>>>
>>
>


Web Dashboard reports 0 for sent/received bytes and records

2017-03-30 Thread Mohammad Kargar
Hi,

I have a simple job where FlinkKafkaConsumer08 reads data from a topic
(with 8 partitions) and a custom Sink implementation inserts the data into
Accumulo database. I can verify that the data is getting inserted into the
db by adding simple system.out.println(...) to the invoke method of the
sink and also monitoring Accumulo stats. However Flink Web Dashboard
reports 0 for sent/received bytes and records (see attached file for a
snapshot). Any thoughts?

*Hadoop platform*: Hortonworks Data Platform 2.5
*Flink*: flink-1.2.0-bin-hadoop27-scala_2.10

Thanks,
Mohammad


Re: Web Dashboard reports 0 for sent/received bytes and records

2017-03-30 Thread Konstantin Gregor
Hi Mohammad,

that's normal. The source will never have a value other than zero in the
"Records received" column, only in the "Records sent" column. But since
your whole program is chained to one subtask, the source will never send
anything to another subtask, because there is none.
Same for the sink, it will only count values it receives from its
preceding subtask, but due to chaining, there is no preceding subtask.
Thus there will only be zeros in your dashboard.
You can add your own Accumulators or Metrics and see the number of
records in your operators through that, or you could also disable the
chaining (but chaining is a good thing for performance, so you probably
don't want to do that).

Hope that helps,
best regards

Konstantin

On 30.03.2017 16:48, Mohammad Kargar wrote:
> Hi,
> 
> I have a simple job where FlinkKafkaConsumer08 reads data from a topic
> (with 8 partitions) and a custom Sink implementation inserts the data
> into Accumulo database. I can verify that the data is getting inserted
> into the db by adding simple system.out.println(...) to the invoke
> method of the sink and also monitoring Accumulo stats. However Flink Web
> Dashboard reports 0 for sent/received bytes and records (see attached
> file for a snapshot). Any thoughts?
> 
> *Hadoop platform*: Hortonworks Data Platform 2.5
> *Flink*: flink-1.2.0-bin-hadoop27-scala_2.10
> 
> Thanks,
> Mohammad

-- 
Konstantin Gregor * konstantin.gre...@tngtech.com
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


Re: Cogrouped Stream never triggers tumbling event time window

2017-03-30 Thread Andrea Spina
Dear community,

I finally solved the issue i was bumped into.
Basically the reason of the encountered problem was the behavior of my
input: incoming rates were so far different in behavior (really late and
scarce presence of second type event in event time).

The solution I employed was to assign timestamps and watermarks to the
source stream just before splitting it into my first type and second type
handled streams. I suppose this solved my problem due to EventTimeTrigger
.getCurrentWatermark() method, which I think it returns the minimum
watermark between the streams scoped by the TriggerContext. So the window
was hanging because of the incoming rate behavior of the second type stream.

Hope it could help someone in the future.

Cheers,

Andrea



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cogrouped-Stream-never-triggers-tumbling-event-time-window-tp12373p12468.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Async Functions and Scala async-client for mySql/MariaDB database connection

2017-03-30 Thread Andrea Spina
Dear Flink community,

I started to use Async Functions in Scala, Flink 1.2.0, in order to retrieve
enriching information from MariaDB database. In order to do that, I firstly
employed classical jdbc library (org.mariadb.jdbc) and it worked has
expected.

Due to the blocking behavior of jdbc, I'm trying to use this library
https://github.com/mauricio/postgresql-async/tree/master/mysql-async
which promises to offer a subset of features in a non-blocking fashion.

Sadly I'm not able to use it.

Following the async function code.

*
object AsyncEnricher {
  case class OutputType(field1: FieldType, field2: FieldType)
}

class AsyncEnricher(configuration: MariaDBConfig)
extends AsyncFunction[InputType, OutputType]
with Serializable
with AutoCloseable
with LazyLogging {

  private val queryString = s"SELECT  FROM [table] WHERE
 = ;"

  implicit lazy val executor =
ExecutionContext.fromExecutor(Executors.directExecutor())

  private lazy val mariaDBClient: Connection = {
val config = createConfiguration(configuration)
val connection = new MySQLConnection(config)
Await.result(connection.connect, 5 seconds)
  }

  override def asyncInvoke(input: InputType, collector:
AsyncCollector[OutputType]): Unit = {

val queryResult = mariaDBClient.sendPreparedStatement(queryString,
Seq(input.fieldToSearch))

queryResult.map(_.rows) onSuccess {
  case Some(resultSet) =>
Try {
  resultSet.head(0).asInstanceOf[FieldType]
} match {
  case Success(value) =>
collector.collect(Iterable(OutputType(value, value)))
  case Failure(e) =>
logger.error(s"retrieving value from MariaDB raised $e:
$queryString executed")
}
  case _ => logger.error(s"value not found: $queryString executed")
}

queryResult onFailure {
  case e: Throwable =>
logger.error(s"retrieving location volume from MariaDB raised $e:
$queryString executed")
}

  }

  override def close(): Unit = {
Try(mariaDBClient.disconnect).recover {
  case t: Throwable => logger.info(s"MariaDB cannot be closed -
${t.getMessage}")
}
  }

}
*

Follows the stack

/
TimerException{java.lang.IllegalStateException: Timer service is shut down}
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: Timer service is shut down
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.registerTimer(SystemProcessingTimeService.java:118)
at
org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.onProcessingTime(TimestampsAndPeriodicWatermarksOperator.java:82)
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:218)
... 7 more

java.lang.NullPointerException
at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.stopResources(AsyncWaitOperator.java:343)
at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.dispose(AsyncWaitOperator.java:320)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
/

I think it's involving connection.connect returning object which is a Future
and so the Await. This is different than jdbc driver, which worked like a
charm. I tried to move away the await from the lazy val.

Can't wait for your opinion. Thank you so much in advance.

Andrea 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Async-Functions-and-Scala-async-client-for-mySql-MariaDB-database-connection-tp12469.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: deploying flink in AWS - some teething issues

2017-03-30 Thread Patrick Lucas
I think Log4j includes a Syslog appender—the log4j config included with
Flink just logs to the logs/ dir, but you should just be able to modify it
(log4j.properties) to suit your needs.

--
Patrick Lucas

On Thu, Mar 30, 2017 at 2:39 PM, Chakravarthy varaga <
chakravarth...@gmail.com> wrote:

> Hi,
>
> With regards to logging (both Flink & application specific logs)
> within the container, are there best practices that you know of to get the
> logs to a centralized locations..
> For e.g. the flink TM's log are local inside the container and I don't
> wish to write to shared/mounted volumes, this means that I have to run
> separate daemon running inside the container along with the TM to transport
> these logs to another server or so...
> + I don't see that flink provides support for syslog to be able to
> connect to rsyslog etc.,
>
> Can you please advice a way to go here?
>
> Best Regards
> CVP
>
> On Wed, Mar 29, 2017 at 11:33 AM, Patrick Lucas  > wrote:
>
>> For 1., I think the standard approach would be to specify from without
>> what the heap size should be. If you want an *x* MB heap, you could set
>> your container memory limit to 1.3 * *x* or so (to account for overhead)
>> and set taskmanager.heap.mb: *x* in your config.
>>
>> The other way around—e.g. from inside the container determine its memory
>> limit and divide it by 1.3—sounds interesting though, so please share if
>> you have success with that.
>>
>> For 2. I don't think there's really a good way yet to monitor the health
>> of containerized jobs directly, so probably your best bet is to watch the
>> job's metrics from outside the Flink cluster.
>>
>> --
>> Patrick Lucas
>>
>> On Wed, Mar 29, 2017 at 10:58 AM, Chakravarthy varaga <
>> chakravarth...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Any updates here? I'm sure many would have faced similar issues like
>>> these, any help here is highly appreciated.
>>>
>>> Best Regards
>>> CVP
>>>
>>> On Tue, Mar 28, 2017 at 5:47 PM, Chakravarthy varaga <
>>> chakravarth...@gmail.com> wrote:
>>>
 Hi Team,

If the flink cluster is conainerized and managed through by a
 container orchestrator,

 1.  the orchestrator allocates resources for each JM. TM etc., say
 if the container (TM) needs to run with 2G RAM, how should this allocation
 be honoured by the TM when its JVM starts. I'm thinking of wrapping up a
 script that determines the resource allocation for the container and writes
 the flink-conf.yaml before the TM starts the process. Is this the way to 
 go?

 2. The container orchestrator looks at health of the containers and
 is however unaware of the job health status runnning inside the
 container/cluster. How should this be determined?

 Best Regards
 CVP



>>>
>>
>


Re: Flink 1.2 time window operation

2017-03-30 Thread Dominik Safaric
> First, some remarks here -  sources (in your case the Kafka consumer) will 
> not stop fetching / producing data when the windows haven’t fired yet.
> 

This is for sure true. However, the plot shows the number of records produced 
per second, where each record was assigned a created at timestamp while being 
created and before being pushed back to Kafka. Sorry I did not clarify this 
before. Anyway, because of this I would expect to have a certain lag. Of 
course, messages will not only be produced into Kafka exactly at window expiry 
and then the produced shutdown - however, what concerns me is that messages 
were produced to Kafka before the first window expired - hence the questions. 

> If you’re writing the outputs of the window operation to Kafka (by adding a 
> Kafka sink after the windowing), then yes it should only write to Kafka when 
> the window has fired.


Hence, I this behaviour that you’ve described and we’ve expected did not occur. 

If it would help, I can share the source code and a detail Flink configuration. 

Cheers,
Dominik

> On 30 Mar 2017, at 13:09, Tzu-Li (Gordon) Tai  wrote:
> 
> Hi,
> 
> Thanks for the clarification.
> 
>> What are the reasons behind consuming/producing messages from/to Kafka while 
>> the window has not expired yet?
> 
> First, some remarks here -  sources (in your case the Kafka consumer) will 
> not stop fetching / producing data when the windows haven’t fired yet. Does 
> this explain what you have plotted in the diagram you attached (sorry, I 
> can’t really reason about the diagram because I’m not so sure what the values 
> of the x-y axes represent)?
> 
> If you’re writing the outputs of the window operation to Kafka (by adding a 
> Kafka sink after the windowing), then yes it should only write to Kafka when 
> the window has fired. The characteristics will also differ for different 
> types of windows, so you should definitely take a look at the Windowing docs 
> [1] about them.
> 
> Cheers,
> Gordon
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-assigners
>  
> 
> On March 30, 2017 at 2:37:41 PM, Dominik Safaric (dominiksafa...@gmail.com 
> ) wrote:
> 
>> What are the reasons behind consuming/producing messages from/to Kafka while 
>> the window has not expired yet?



Monitoring REST API and YARN session

2017-03-30 Thread Mohammad Kargar
How can I access the REST APIs
for
monitoring when cluster launched in a yarn session?


s3 read while writing inside sink function

2017-03-30 Thread Sathi Chowdhury
Hi fellow Flink enthusiasts,

I am trying figure out a recommended way to read s3 data  while  I am trying to 
write to s3 using BucketingSink.



BucketingSink s3Sink = new BucketingSink("s3://" + entityBucket 
+ "/")
.setBucketer(new EntityCustomBucketer())
.setWriter(new StringWriter())
…….


The issue is that I want to write to S3 based on a condition check on the 
filename already existing there.
I wanted to bake this check within the sink itself, any clue on what is needed 
for this?
Thanks
Sathi


=Notice to Recipient: This e-mail transmission, and any documents, 
files or previous e-mail messages attached to it may contain information that 
is confidential or legally privileged, and intended for the use of the 
individual or entity named above. If you are not the intended recipient, or a 
person responsible for delivering it to the intended recipient, you are hereby 
notified that you must not read this transmission and that any disclosure, 
copying, printing, distribution or use of any of the information contained in 
or attached to this transmission is STRICTLY PROHIBITED. If you have received 
this transmission in error, please immediately notify the sender by telephone 
or return e-mail and delete the original transmission and its attachments 
without reading or saving in any manner. Thank you. =