Re: Latency Measurement

2017-07-17 Thread Chesnay Schepler

Hello,

As for 1), my suspicion is that this is caused by chaining. If the map 
function is chained to the kafka source then the latency markers are 
always immediately forwarded, regardless of what your map function is doing.
If the map function is indeed chained to the source, could you try again 
after disabling the chain by calling `X.map(...).createNewChain()` and 
report back?


As for 2), I don't think this is possible right now.

Regards,
Chesnay

On 17.07.2017 12:42, Paolo Cristofanelli wrote:

Hi,

I would like to understand how to measure the latency of a record.
I have set up a simple project with a Kafka consumer that reads from a 
topic and performs a simple map (with a thread sleep inside).


In order to measure the latency of this mapper I have added 
env.getConfig().setLatencyTrackingInterval(10);


After that, I was planning to access the latency through the webUI 
interface but the related graph does not show any values.
I do not understand why. I was thinking that I in the graph I should 
observe at least the sleep duration.


I also have another question:

I am using a count window, aggregating every 100 input records and 
then I perform a map. I want to see the latency as the difference 
between the time at which the output record is emitted and the arrival 
time of the earliest input record.


For example, the first value arrives at x. After x +5 I all the 100 
values arrived and the system can aggregate them. Now I perform the 
map operation and we emit the output record at time x+15.

I would like to obtain 15 as latency.
Do you have any suggestion on how to proceed?

Thanks for your time,
Paolo Cristofanelli





Re: Flink Jobs disappers

2017-07-08 Thread Chesnay Schepler
If a TaskManager ran out of memory there should be something in the 
JobManager logs about a unreachable TaskManager.
That said, there should also be something in the JobManager logs about 
the job disappearing...


Could you set the logging level to DEBUG, run the job again, and provide 
us (or me directly) with the logs?


Regards,
Chesnay

On 08.07.2017 08:44, G.S.Vijay Raajaa wrote:

HI Chesnay,


I am currently using Flink - 1.3 using docker containers. I am not 
using it in HA mode. I have 3 task managers and one job manager. This 
happens randomly and not every time. Does it mean the task manager ran 
out of memory etc? I am using slots more than the available core , I 
hope compute is shared in round robin. Any pointers to tuning and HA 
setup will be greatly appreciated.


Regards,
Vijay Raajaa GS

On Sat, Jul 8, 2017 at 12:04 PM, Chesnay Schepler <ches...@apache.org 
<mailto:ches...@apache.org>> wrote:


Hello,

could you tell us a bit more about your setup? Which Flink version
you're using, whether HA is enabled, does this happen every time
etc. .
Regards,
Chesnay


On 06.07.2017 21:43, G.S.Vijay Raajaa wrote:

HI,

I am using Flink Task manager and Job Manager as docker
containers. Strangely, I find the jobs to disappear from the
web portal after some time. The jobs don't move to the failed
state either. Any pointers will be really helpful. Not able to
get a clue from the logs.

Kindly let me know if I need specific tuning and ways to
persists the uploaded jars.

Regards,
Vijay Raajaa G S








Re: Flink Jobs disappers

2017-07-08 Thread Chesnay Schepler

Hello,

could you tell us a bit more about your setup? Which Flink version 
you're using, whether HA is enabled, does this happen every time etc. .

Regards,
Chesnay

On 06.07.2017 21:43, G.S.Vijay Raajaa wrote:

HI,

I am using Flink Task manager and Job Manager as docker containers. 
Strangely, I find the jobs to disappear from the web portal after some 
time. The jobs don't move to the failed state either. Any pointers 
will be really helpful. Not able to get a clue from the logs.


Kindly let me know if I need specific tuning and ways to persists the 
uploaded jars.


Regards,
Vijay Raajaa G S





Re: Flink shaded table API

2017-07-25 Thread Chesnay Schepler

This sounds similar to https://issues.apache.org/jira/browse/FLINK-6173.

On 25.07.2017 13:07, nragon wrote:

Let's see if I can sample this :P.


First i'm reading from kafka.

FlinkKafkaConsumer010 consumer =
KafkaSource.consumer(this.zookeeper, this.sourceName, 5);
  
consumer.assignTimestampsAndWatermarks(KafkaTimestampExtractor.extractor());



Then, converting my object(DataParameterMap) into a Row

return ViewMapFunction.map(env.addSource(consumer), this.entity,
this.typeInfo);


After that, I register the table

tableEnv.registerTableSource("tableX", )


and I execute an sql ("SELECT APN, DATA_VOLUME_DOWN, DATA_VOLUME_UP, MSISDN
FROM
PGW_VIEW_A";)

tableEnv.sql(sql).printSchema();

The error happens here at this point.
I'm using version 1.3.1




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-shaded-table-API-tp14432p14435.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.





Re: a lot of connections in state "CLOSE_WAIT"

2017-07-25 Thread Chesnay Schepler

Hello,

Could you tell us which browser you are using, including the version?
(and maybe try out if the issue persists with a different one)

Regards,
Chesnay

On 25.07.2017 05:20, XiangWei Huang wrote:

hi,

Sorry for replying so late.
I have met this issue again and the list is constantly keep growing even if
i close the page ,until the website is been unavailable.

This issue appeared each time i add  metrics for a job from web ui.

by the way ,the version of Flink is 1.3.1



Regards,
XiangWei




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/a-lot-of-connections-in-state-CLOSE-WAIT-tp14046p14422.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.





Re: Custom Kryo serializer

2017-07-24 Thread Chesnay Schepler

Copy of a mail i sent to the user mailing list only:

Raw state can only be used when implementing an operator, not a function.

For functions you have to use Managed Operator State. Your function will 
have to implement
the CheckpointedFunction interface, and create a ValueStateDescriptor 
that you register in initializeState.


On 24.07.2017 16:26, Boris Lublinsky wrote:

Is there a chance, this can be answered?

Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>
https://www.lightbend.com/


Begin forwarded message:

*From: *Boris Lublinsky <boris.lublin...@lightbend.com 
<mailto:boris.lublin...@lightbend.com>>

*Subject: **Re: Custom Kryo serializer*
*Date: *July 19, 2017 at 8:28:16 AM CDT
*To: *user@flink.apache.org <mailto:user@flink.apache.org>, 
ches...@apache.org <mailto:ches...@apache.org>


Thanks for the reply, but I am not using it for managed state, but 
rather for the raw state

In my implementation I have the following

class DataProcessorKeyedextends CoProcessFunction[WineRecord, ModelToServe, 
Double]{

   // The managed keyed state see 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html 
var modelState: ValueState[ModelToServeStats] = _

   var newModelState: ValueState[ModelToServeStats] = _
   // The raw state - 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state 
var currentModel : Option[Model] =None

   var newModel : Option[Model] =None

Where current and new model are instances of the trait for which I 
implement serializer
According to documentation 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state


“/Raw State/ is state that operators keep in their own data 
structures. When checkpointed, they only write a sequence of bytes 
into the checkpoint. Flink knows nothing about the state’s data 
structures and sees only the raw bytes.”


So I was assuming that I need to provide serializer for this.
Am I missing something?





Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>
https://www.lightbend.com/



-- Forwarded message --
From: *Chesnay Schepler* <ches...@apache.org 
<mailto:ches...@apache.org>>

Date: Wed, Jul 19, 2017 at 1:34 PM
Subject: Re: Custom Kryo serializer
To: user@flink.apache.org <mailto:user@flink.apache.org>


Hello,

I assume you're passing the class of your serializer in a 
StateDescriptor constructor.


If so, you could add a breakpoint in 
Statedescriptor#initializeSerializerUnlessSet,
and check what typeInfo is created and which serializer is created 
as a result.


One thing you could try right away is registering your serializer 
for the Model implementations,

instead of the trait.

Regards,
Chesnay


On 14.07.2017 15:50, Boris Lublinsky wrote:

Hi
I have several implementations of my Model trait,

trait Model {
   def score(input :AnyVal) :AnyVal def cleanup() :Unit def toBytes() : 
Array[Byte]
   def getType :Long }

neither one of them are serializable, but are used in the state 
definition.

So I implemented custom serializer

import com.esotericsoftware.kryo.io 
<http://com.esotericsoftware.kryo.io/>.{Input, Output}

import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.lightbend.model.modeldescriptor.ModelDescriptor


class ModelSerializerKryoextends Serializer[Model]{
   
   super.setAcceptsNull(false)

   super.setImmutable(true)

   /** Reads bytes and returns a new object of the specified concrete 
type. *  * Before Kryo can be used to read child objects, {@link 
Kryo#reference(Object)} must be called with the parent object to * 
ensure it can be referenced by the child objects. Any serializer 
that uses {@link Kryo} to read a child object may need to * be 
reentrant. *  * This method should not be called directly, 
instead this serializer can be passed to {@link Kryo} read methods 
that accept a * serialier. * * @return May be null if { @link 
#getAcceptsNull()} is true. */ override def read(kryo: Kryo, input: Input, `type`:Class[Model]): Model = {


 import ModelSerializerKryo._

 val mType = input.readLong().asInstanceOf[Int]
 val bytes =Stream.continually(input.readByte()).takeWhile(_ != -1).toArray
 factories.get(mType)match {
   case Some(factory) => factory.restore(bytes)
   case _ =>throw new Exception(s"Unknown model type $mTypeto restore")
 }
   }

   /** Writes the bytes for the object to the output. *  * This 
method should not be called directly, instead this serializer can 
be passed to {@link Kryo} write methods that accept a * serialier. 
* * @param value May be null if { @link #getAcceptsNull()} is true. 
*/ override def write(kryo: Kryo, output: Output, value: Model):Unit = {

 output.writeLong(value.getType)
 output.write(valu

Re: Is that possible for flink to dynamically read and change configuration?

2017-07-24 Thread Chesnay Schepler
So I don't know why it doesn't work (it should, afaik), but as a 
workaround you could maintain
an ArrayList or similar in your function, and only add/read elements 
from the ListState in snapshot/initialize state.


On 24.07.2017 17:10, ZalaCheung wrote:

Hi all,

Does anyone have idea about the non-keyed managed state problem below?
I think all the function in the testFunc class should share the 
ListState “metrics”. But after I add element to ListState at flatMap2 
function, I cannot retrieve the element added to ListState.



Desheng Zhang


On Jul 24, 2017, at 22:06, ZalaCheung 
<gzzhangdesh...@corp.netease.com 
<mailto:gzzhangdesh...@corp.netease.com>> wrote:


Hi Chesnay,

Thank you very much. Now I tried to ignore the default value of 
ListState and Try to use the CoFlatmap function with managed state. 
But what surprised me is that it seems the state was not shared by 
two streams.


My test code is shown below.

DataStream result = stream
 .connect(control)
 .flatMap(new testFunc());

public static class testFuncimplements 
CoFlatMapFunction<String,String,String>,CheckpointedFunction{

 private ListStatemetrics; @Override public void 
snapshotState(FunctionSnapshotContext functionSnapshotContext)throws Exception {

 }

 @Override public void initializeState(FunctionInitializationContext 
functionInitializationContext)throws Exception {
 ListStateDescriptor metricsStateDescriptor =
 new ListStateDescriptor<>(
 "metrics", TypeInformation.of(new TypeHint() 
{})); metrics = 
functionInitializationContext.getOperatorStateStore().getListState(metricsStateDescriptor); }

 @Override public void flatMap1(String s, Collector 
collector)throws Exception {
 String myMetrics =null; for(String element:metrics.get()){
 logger.info("element in metric: " + s); myMetrics = element; }
 if(myMetrics ==null){
 logger.info("Not initialized"); }else {
 logger.info("initialized: " + myMetrics); }

 }

 @Override public void flatMap2(String s, Collector 
collector)throws Exception {
 metrics.clear(); metrics.add(s); for(String element:metrics.get()){
 logger.info("element in metric: " + element); }

 }
}

I connected two streams(stream and control) and use CoflatmapFunction 
on them. For control stream, I send a string and print the right log:

*- element in metric: heyjude*
Then I send another string to the first stream.
But the log prints:
*- Not initialized*

I am confused. I successfully receive msg for stream control and add 
the string to ListState. But when I tried to retrieve ListState and 
flatMap1, I got nothing.


Thanks.
Desheng Zhang



On Jul 24, 2017, at 21:01, Chesnay Schepler <ches...@apache.org 
<mailto:ches...@apache.org>> wrote:


Hello,

That's an error in the documentation, only the ValueStateDescriptor 
has a defaultValue constructor argument.


Regards,
Chesnay

On 24.07.2017 14:56, ZalaCheung wrote:

Hi Martin,

Thanks for your advice. That’s really helpful. I am using the push 
scenario. I am now having some trouble because of the state I want 
to maintain. For me, the simplest way is to maintain to ValueState 
in a CoFlatMapFunction(Actually RichCoFlatMapFunction). But the 
rich function can only be used on Keyed Stream. And for a connected 
stream, at least for my scenario, I should not use KeyBy() 
method(Actually it seems not allowed to use KeyBy() function on 
connected stream ).


Thus instead of using Rich function for Keyed Managed State, I 
tried to use CheckpointedFunction for my non-keyed state. However, 
in CheckpointedFunction, I can only use ListState, which only has 
add() and Iterator method. I am not sure whether I can just replace 
the element in the ListState. What exactly make me stuck is that I 
cannot initialize my ListState with ListStateDescriptor. It says 
there is no constructor for initialization value. I actually saw 
that on official document.


https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html

|@Override public void initializeState(FunctionInitializationContext 
context) throws Exception { ListStateDescriptor<Tuple2<String, 
Integer>> descriptor = new ListStateDescriptor<>( 
"buffered-elements", TypeInformation.of(new TypeHint<Tuple2<Long, 
Long>>() {}), Tuple2.of(0L, 0L)); checkpointedState = 
context.getOperatorStateStore().getListState(descriptor); if 
(context.isRestored()) { for (Tuple2<String, Integer> element : 
checkpointedState.get()) { bufferedElements.add(element); } } }|



But in my code(Flink 1.3.1), it says there’s no constructor for 
three arguments(the third argument in the example above is the 
default value). I am really confused.


How can I maintain my state for the CoFlatMap function?


Thanks
 Desheng Zhang


On

Re: Custom Kryo serializer

2017-07-24 Thread Chesnay Schepler
The docs provide a somewhat good overview on how to interact with 
managed state: 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state.html#using-managed-operator-state


To use the custom serializer you can supply the type class in the 
StateDescriptor constructor when initializing the state:


private ListStatecurrentModel =null; ...

@Override public void initializeState(FunctionInitializationContext 
context)throws Exception {
   ListStateDescriptor descriptor =new ListStateDescriptor<>(
  "current-model", Model.class ); currentModel = 
context.getOperatorStateStore().getListState(descriptor); }
...

Note that the above is for /unkeyed/ state. For keyed state you would 
call context.getKeyedStateStore() instead.


Do not be confused that I'm using a ListState; that's just how the 
interface works...the idea being that if you were to decrease the 
parallelism upon restore a single instance of your function may receive 
multiple states.


Anyway, you can add a Model to the state using ListState#add and clear 
the entire ListState with ListState#clear.


I'm not sure though how well you can introduce options here.

Hope this helps,
Chesnay

On 24.07.2017 16:31, Boris Lublinsky wrote:

Thanks Chesney,
Can you, please, point me to any example?

Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>
https://www.lightbend.com/

On Jul 24, 2017, at 9:27 AM, Chesnay Schepler <ches...@apache.org 
<mailto:ches...@apache.org>> wrote:


Copy of a mail i sent to the user mailing list only:

Raw state can only be used when implementing an operator, not a function.

For functions you have to use Managed Operator State. Your function 
will have to implement
the CheckpointedFunction interface, and create a ValueStateDescriptor 
that you register in initializeState.


On 24.07.2017 16:26, Boris Lublinsky wrote:

Is there a chance, this can be answered?

Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>
https://www.lightbend.com/


Begin forwarded message:

*From: *Boris Lublinsky <boris.lublin...@lightbend.com 
<mailto:boris.lublin...@lightbend.com>>

*Subject: **Re: Custom Kryo serializer*
*Date: *July 19, 2017 at 8:28:16 AM CDT
*To: *user@flink.apache.org <mailto:user@flink.apache.org>, 
ches...@apache.org <mailto:ches...@apache.org>


Thanks for the reply, but I am not using it for managed state, but 
rather for the raw state

In my implementation I have the following

class DataProcessorKeyedextends CoProcessFunction[WineRecord, ModelToServe, 
Double]{

   // The managed keyed state see 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html 
var modelState: ValueState[ModelToServeStats] = _

   var newModelState: ValueState[ModelToServeStats] = _
   // The raw state - 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state 
var currentModel : Option[Model] =None

   var newModel : Option[Model] =None

Where current and new model are instances of the trait for which I 
implement serializer
According to documentation 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state


“/Raw State/ is state that operators keep in their own data 
structures. When checkpointed, they only write a sequence of bytes 
into the checkpoint. Flink knows nothing about the state’s data 
structures and sees only the raw bytes.”


So I was assuming that I need to provide serializer for this.
Am I missing something?





Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>
https://www.lightbend.com/



-- Forwarded message --
From: *Chesnay Schepler* <ches...@apache.org 
<mailto:ches...@apache.org>>

Date: Wed, Jul 19, 2017 at 1:34 PM
Subject: Re: Custom Kryo serializer
To: user@flink.apache.org <mailto:user@flink.apache.org>


Hello,

I assume you're passing the class of your serializer in a 
StateDescriptor constructor.


If so, you could add a breakpoint in 
Statedescriptor#initializeSerializerUnlessSet,
and check what typeInfo is created and which serializer is created 
as a result.


One thing you could try right away is registering your serializer 
for the Model implementations,

instead of the trait.

Regards,
Chesnay


On 14.07.2017 15:50, Boris Lublinsky wrote:

Hi
I have several implementations of my Model trait,

trait Model {
   def score(input :AnyVal) :AnyVal def cleanup() :Unit def toBytes() : 
Array[Byte]
   def getType :Long }

neither one of them are serializable, but are used in the state 
definition.

So I implemented custom serializer

import com.esotericsoftware.kryo.io 
<http://com.esotericsoftware.kryo.io/>.{Input, Output}

import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.lightbend.model.modeldescriptor.Mod

Re: a lot of connections in state "CLOSE_WAIT"

2017-07-26 Thread Chesnay Schepler
So this /only/ happens when you select a metric? Without a selected 
metric everything works fine?


Are the metrics you selected shown correctly?

Did you modify the "jobmanager.web.refresh-interval" setting? (maybe 
check the flink-conf-yaml for the current setting)


On 26.07.2017 04:57, XiangWei Huang wrote:

hi,

The browser i am using is Google Chrome  with version 59.0.3071.115 and the 
issue persists when i tried Firefox.

Regards,
XiangWei

在 2017年7月25日,17:48,Chesnay Schepler <ches...@apache.org> 写道:

Hello,

Could you tell us which browser you are using, including the version?
(and maybe try out if the issue persists with a different one)

Regards,
Chesnay

On 25.07.2017 05:20, XiangWei Huang wrote:

hi,

Sorry for replying so late.
I have met this issue again and the list is constantly keep growing even if
i close the page ,until the website is been unavailable.

This issue appeared each time i add  metrics for a job from web ui.

by the way ,the version of Flink is 1.3.1



Regards,
XiangWei




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/a-lot-of-connections-in-state-CLOSE-WAIT-tp14046p14422.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.







Re: Flink monitor rest API question

2017-07-19 Thread Chesnay Schepler

Hello,

Looks like you stumbled upon a bug in our REST API and  use a client 
that is stricter than others.


I will create a JIRA for this.

Regards,
Chesnay

On 19.07.2017 13:31, Will Du wrote:

Hi folks,
I am using a java rest client - unirest lib to GET from flink rest API to get a 
Job status. I got exception-unsupported content encoding -UTF8.

Do you guys known how to resolve it? I use postman client working fine.
Thanks,
Will






Re: Custom Kryo serializer

2017-07-19 Thread Chesnay Schepler

Hello,

I assume you're passing the class of your serializer in a 
StateDescriptor constructor.


If so, you could add a breakpoint in 
Statedescriptor#initializeSerializerUnlessSet,
and check what typeInfo is created and which serializer is created as a 
result.


One thing you could try right away is registering your serializer for 
the Model implementations,

instead of the trait.

Regards,
Chesnay

On 14.07.2017 15:50, Boris Lublinsky wrote:

Hi
I have several implementations of my Model trait,

trait Model {
   def score(input :AnyVal) :AnyVal def cleanup() :Unit def toBytes() : 
Array[Byte]
   def getType :Long }

neither one of them are serializable, but are used in the state 
definition.

So I implemented custom serializer

import com.esotericsoftware.kryo.io 
.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.lightbend.model.modeldescriptor.ModelDescriptor


class ModelSerializerKryoextends Serializer[Model]{
   
   super.setAcceptsNull(false)

   super.setImmutable(true)

   /** Reads bytes and returns a new object of the specified concrete 
type. *  * Before Kryo can be used to read child objects, {@link 
Kryo#reference(Object)} must be called with the parent object to * 
ensure it can be referenced by the child objects. Any serializer that 
uses {@link Kryo} to read a child object may need to * be reentrant. * 
 * This method should not be called directly, instead this 
serializer can be passed to {@link Kryo} read methods that accept a * 
serialier. * * @return May be null if { @link #getAcceptsNull()} is 
true. */ override def read(kryo: Kryo, input: Input, `type`:Class[Model]): Model = {


 import ModelSerializerKryo._

 val mType = input.readLong().asInstanceOf[Int]
 val bytes =Stream.continually(input.readByte()).takeWhile(_ != -1).toArray
 factories.get(mType)match {
   case Some(factory) => factory.restore(bytes)
   case _ =>throw new Exception(s"Unknown model type $mTypeto restore")
 }
   }

   /** Writes the bytes for the object to the output. *  * This method 
should not be called directly, instead this serializer can be passed 
to {@link Kryo} write methods that accept a * serialier. * * @param 
value May be null if { @link #getAcceptsNull()} is true. */ override 
def write(kryo: Kryo, output: Output, value: Model):Unit = {

 output.writeLong(value.getType)
 output.write(value.toBytes)
   }
}

object ModelSerializerKryo{
   private val factories =Map(ModelDescriptor.ModelType.PMML.value -> PMMLModel, 
ModelDescriptor.ModelType.TENSORFLOW.value -> TensorFlowModel)
}
And added the following

// Add custom serializer env.getConfig.addDefaultKryoSerializer(classOf[Model], 
classOf[ModelSerializerKryo])

To configure it.
I can see checkpoint messages at the output console, but I never hist 
a break point in serializer.

Any suggestions?



Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com 
https://www.lightbend.com/





Re: Executing Flink server From IntelliJ

2017-07-19 Thread Chesnay Schepler

Hello,

this problem is described in 
https://issues.apache.org/jira/browse/FLINK-6689.


Basically, if you want to use the LocalFlinkMiniCluster you should use a 
TestStreamEnvironment instead.

The RemoteStreamEnvironment only works with a proper Flink cluster.

Regards,
Chesnay

On 14.07.2017 15:43, Boris Lublinsky wrote:

Hi,
I am trying to upgrade my project from Flink 1.2 to 1.3 and getting 
problems while trying to run Flink server from my Intellij project. 
The code


// Execute on the local Flink server - to test queariable state def 
executeServer() :Unit = {


   // We use a mini cluster here for sake of simplicity, because I don't 
want // to require a Flink installation to run this demo. Everything 
should be // contained in this JAR. val port =6124 val parallelism =4 val config =new Configuration()

   config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port)
   config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
   config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism)
   // In a non MiniCluster setup queryable state is enabled by default. 
config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true)

   // Create a local Flink server val flinkCluster =new 
LocalFlinkMiniCluster(config, false)
   try {
 // Start server and create environment flinkCluster.start(true); val env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", port, 
parallelism)
 // Build Graph buildGraph(env)
 env.execute()
 val jobGraph = env.getStreamGraph.getJobGraph
 // Submit to the server and wait for completion 
flinkCluster.submitJobAndWait(jobGraph, false)
   }catch {
 case e:Exception => e.printStackTrace()
   }
}
Worked on version 1.2, but on 1.3 I am getting

08:41:29,179 INFO 
 org.apache.flink.runtime.minicluster.FlinkMiniCluster - Starting 
FlinkMiniCluster.
08:41:29,431 INFO  akka.event.slf4j.Slf4jLogger   
 - Slf4jLogger started
08:41:29,498 INFO  Remoting- Starting 
remoting
08:41:29,730 INFO  Remoting- Remoting 
started; listening on addresses :[akka.tcp://flink@localhost:6124]
08:41:29,762 INFO  org.apache.flink.runtime.blob.BlobServer  - Created 
BLOB server storage directory 
/var/folders/3m/52z04fgs3hq88mzft9l0fsrmgn/T/blobStore-4e626961-9155-47e9-b1b8-f835a8435cfc
08:41:29,765 INFO  org.apache.flink.runtime.blob.BlobServer  - Started 
BLOB server at 0.0.0.0:54319 - max concurrent requests: 50 - max 
backlog: 1000
08:41:29,775 INFO  org.apache.flink.runtime.metrics.MetricRegistry - 
No metrics reporter configured, no metrics will be exposed/reported.
08:41:29,781 INFO  org.apache.flink.runtime.jobmanager.MemoryArchivist 
- Started memory archivist akka://flink/user/archive
08:41:29,786 INFO  org.apache.flink.runtime.jobmanager.JobManager  - 
Starting JobManager at akka.tcp://flink@localhost:6124/user/jobmanager.
08:41:29,787 INFO 
 org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService 
 - Proposing leadership to contender 
org.apache.flink.runtime.jobmanager.JobManager@59cd5ef5 @ 
akka.tcp://flink@localhost:6124/user/jobmanager
08:41:29,796 INFO  akka.event.slf4j.Slf4jLogger   
 - Slf4jLogger started
08:41:29,804 INFO  Remoting- Starting 
remoting
08:41:29,813 INFO  Remoting- Remoting 
started; listening on addresses :[akka.tcp://flink@localhost:54320]
08:41:29,825 INFO  akka.event.slf4j.Slf4jLogger   
 - Slf4jLogger started
08:41:29,830 INFO  Remoting- Starting 
remoting
08:41:29,836 INFO  Remoting- Remoting 
started; listening on addresses :[akka.tcp://flink@localhost:54321]
08:41:29,846 INFO  org.apache.flink.runtime.jobmanager.JobManager  - 
JobManager akka.tcp://flink@localhost:6124/user/jobmanager was granted 
leadership with leader session ID 
Some(61d3ed9b-1c24-4bbf-99ef-c2a891613473).
08:41:29,847 INFO 
 org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService 
 - Received confirmation of leadership for leader 
akka.tcp://flink@localhost:6124/user/jobmanager , 
session=61d3ed9b-1c24-4bbf-99ef-c2a891613473
08:41:29,850 INFO 
 org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration  - 
Messages have a max timeout of 1 ms
08:41:29,851 INFO 
 org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager 
 - Received leader address but not running in leader ActorSystem. 
Cancelling registration.
08:41:29,855 INFO 
 org.apache.flink.runtime.taskexecutor.TaskManagerServices - Temporary 
file directory '/var/folders/3m/52z04fgs3hq88mzft9l0fsrmgn/T': 
total 464 GB, usable 353 GB (76.08% usable)
08:41:30,493 INFO 
 org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - 
Allocated 363 MB for network buffer pool (number of memory segments: 
11634, bytes per 

Re: Custom Kryo serializer

2017-07-19 Thread Chesnay Schepler

Raw state can only be used when implementing an operator, not a function.

For functions you have to use Managed Operator State. Your function will 
have to implement
the CheckpointedFunction interface, and create a ValueStateDescriptor 
that you register in initializeState.


On 19.07.2017 15:28, Boris Lublinsky wrote:
Thanks for the reply, but I am not using it for managed state, but 
rather for the raw state

In my implementation I have the following

class DataProcessorKeyedextends CoProcessFunction[WineRecord, ModelToServe, 
Double]{

   // The managed keyed state see 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html 
var modelState: ValueState[ModelToServeStats] = _

   var newModelState: ValueState[ModelToServeStats] = _
   // The raw state - 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state 
var currentModel : Option[Model] =None

   var newModel : Option[Model] =None

Where current and new model are instances of the trait for which I 
implement serializer
According to documentation 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state


“/Raw State/ is state that operators keep in their own data 
structures. When checkpointed, they only write a sequence of bytes 
into the checkpoint. Flink knows nothing about the state’s data 
structures and sees only the raw bytes.”


So I was assuming that I need to provide serializer for this.
Am I missing something?





Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>
https://www.lightbend.com/



-- Forwarded message --
From: *Chesnay Schepler* <ches...@apache.org <mailto:ches...@apache.org>>
Date: Wed, Jul 19, 2017 at 1:34 PM
Subject: Re: Custom Kryo serializer
To: user@flink.apache.org <mailto:user@flink.apache.org>


Hello,

I assume you're passing the class of your serializer in a 
StateDescriptor constructor.


If so, you could add a breakpoint in 
Statedescriptor#initializeSerializerUnlessSet,
and check what typeInfo is created and which serializer is created as 
a result.


One thing you could try right away is registering your serializer for 
the Model implementations,

instead of the trait.

Regards,
Chesnay


On 14.07.2017 15:50, Boris Lublinsky wrote:

Hi
I have several implementations of my Model trait,

trait Model {
   def score(input :AnyVal) :AnyVal def cleanup() :Unit def toBytes() : 
Array[Byte]
   def getType :Long }

neither one of them are serializable, but are used in the state 
definition.

So I implemented custom serializer

import com.esotericsoftware.kryo.io 
<http://com.esotericsoftware.kryo.io/>.{Input, Output}

import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.lightbend.model.modeldescriptor.ModelDescriptor


class ModelSerializerKryoextends Serializer[Model]{
   
   super.setAcceptsNull(false)

   super.setImmutable(true)

   /** Reads bytes and returns a new object of the specified concrete 
type. *  * Before Kryo can be used to read child objects, {@link 
Kryo#reference(Object)} must be called with the parent object to * 
ensure it can be referenced by the child objects. Any serializer 
that uses {@link Kryo} to read a child object may need to * be 
reentrant. *  * This method should not be called directly, 
instead this serializer can be passed to {@link Kryo} read methods 
that accept a * serialier. * * @return May be null if { @link 
#getAcceptsNull()} is true. */ override def read(kryo: Kryo, input: Input, `type`:Class[Model]): Model = {


 import ModelSerializerKryo._

 val mType = input.readLong().asInstanceOf[Int]
 val bytes =Stream.continually(input.readByte()).takeWhile(_ != -1).toArray
 factories.get(mType)match {
   case Some(factory) => factory.restore(bytes)
   case _ =>throw new Exception(s"Unknown model type $mTypeto restore")
 }
   }

   /** Writes the bytes for the object to the output. *  * This 
method should not be called directly, instead this serializer can be 
passed to {@link Kryo} write methods that accept a * serialier. * * 
@param value May be null if { @link #getAcceptsNull()} is true. */ 
override def write(kryo: Kryo, output: Output, value: Model):Unit = {

 output.writeLong(value.getType)
 output.write(value.toBytes)
   }
}

object ModelSerializerKryo{
   private val factories =Map(ModelDescriptor.ModelType.PMML.value -> PMMLModel, 
ModelDescriptor.ModelType.TENSORFLOW.value -> TensorFlowModel)
}
And added the following

// Add custom serializer env.getConfig.addDefaultKryoSerializer(classOf[Model], 
classOf[ModelSerializerKryo])

To configure it.
I can see checkpoint messages at the output console, but I never 
hist a break point in serializer.

Any suggestions?



Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>
https://www.lightbend.com/










Re: Is that possible for flink to dynamically read and change configuration?

2017-07-24 Thread Chesnay Schepler

Hello,

That's an error in the documentation, only the ValueStateDescriptor has 
a defaultValue constructor argument.


Regards,
Chesnay

On 24.07.2017 14:56, ZalaCheung wrote:

Hi Martin,

Thanks for your advice. That’s really helpful. I am using the push 
scenario. I am now having some trouble because of the state I want to 
maintain. For me, the simplest way is to maintain to ValueState in a 
CoFlatMapFunction(Actually RichCoFlatMapFunction). But the rich 
function can only be used on Keyed Stream. And for a connected stream, 
at least for my scenario, I should not use KeyBy() method(Actually it 
seems not allowed to use KeyBy() function on connected stream ).


Thus instead of using Rich function for Keyed Managed State, I tried 
to use CheckpointedFunction for my non-keyed state. However, in 
CheckpointedFunction, I can only use ListState, which only has add() 
and Iterator method. I am not sure whether I can just replace the 
element in the ListState. What exactly make me stuck is that I cannot 
initialize my ListState with ListStateDescriptor. It says there is no 
constructor for initialization value. I actually saw that on official 
document.


https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html

|@Override public void initializeState(FunctionInitializationContext 
context) throws Exception { ListStateDescriptor> descriptor = new ListStateDescriptor<>( "buffered-elements", 
TypeInformation.of(new TypeHint>() {}), 
Tuple2.of(0L, 0L)); checkpointedState = 
context.getOperatorStateStore().getListState(descriptor); if 
(context.isRestored()) { for (Tuple2 element : 
checkpointedState.get()) { bufferedElements.add(element); } } }|



But in my code(Flink 1.3.1), it says there’s no constructor for three 
arguments(the third argument in the example above is the default 
value). I am really confused.


How can I maintain my state for the CoFlatMap function?


Thanks
 Desheng Zhang


On Jul 24, 2017, at 19:44, Martin Eden > wrote:


Hey Desheng,

Some options that come to mind:
- Cave man style: Stop and restart job with new config.
- Poll scenario: You could build your own thread that periodically 
loads from the db into a per worker accessible cache.
- Push scenario: have a config stream (based off of some queue) which 
you connect to your data stream via the connect operator. In the 
CoFlatMapFunction that you have to provide you basically update Flink 
state from the config flatMap and read the flink state from the data 
flatMap and pass it along with the data. Then in the specific 
operator that uses the config it can always get it from the data 
tuple that comes alongside the data, say in an invoke method call of 
a sink. Example here 
.


Hope that gives u some ideas,
M


On Mon, Jul 24, 2017 at 12:16 PM, ZalaCheung 
> wrote:


Hi all,

I am  now trying to implement a anomaly detection algorithm on
Flink, which is actually implement a Map operator to do anomaly
detection based on timeseries.
At first I want to read configuration(like which kafka source
host to read datastream from and which sink address to write data
to ) from mongo db. It contains some system metric  I want to
monitor.

What I did was read configuration from mongo DB and set as
configuration of flink.

StreamExecutionEnvironment  see 
=StreamExecutionEnvironment.getExecutionEnvironment(); Configuration conf =new 
Configuration();

JSONObject jsonConfiguration =readConfiguration();

conf.setInteger("period",jsonConfiguration.getInt("period"));
conf.setDouble("percentage",jsonConfiguration.getDouble("percentage"));
conf.setDouble(“metric",jsonConfiguration.getDouble(“metric"));

see.getConfig().setGlobalJobParameters(conf);

The “readConfiguration()” method read the configuration from mongoDB.

Just like the code I showed above. I set globalJobParameters to
let all my workers share these parameters including the metric I
want to monitor.But maybe at some point I want to change the
metric I want to monitor. I think one possible way is to
dynamically(or periodically) read  configuration and reset
the globalJobParameters to make the Flink program to change the
metric to monitor. Is  that possible?

Thanks
Desheng Zhang









Re: a lot of connections in state "CLOSE_WAIT"

2017-07-03 Thread Chesnay Schepler

Hello,

is this list constantly growing? If you reload the WebUI do they pile up 
again?


My guess would be the watermarks display is overloading the metrics handler.
If i remember correctly the WebUI keeps fetching the watermark metrics 
regardless

of what page you're looking at.

Things to investigate would be whether it continues to query for 
watermarks for finished

jobs, and whether requests are batched across tasks & jobs.

Regards,
Chesnay

On 30.06.2017 18:05, Nico Kruber wrote:

Hi XiangWei,
this could be a resource leak, i.e. a socket not getting closed, but I was
unable to reproduce that behaviour. Maybe Chesnay (cc'd) has an idea on how/
where this may happen.

Can you tell us a bit more on what you where doing / how the webinterface was
used? Is there a way to reproduce the behaviour?



Nico


PS: please don't post text output as images - this makes reading/finding the
email and following the thread much more complicated. Please copy the text
instead.

On Thursday, 29 June 2017 06:36:47 CEST XiangWei Huang wrote:

There were a lot of connections which state  is “CLOSE_WAIT”  appeared when
a few days later after flink jobManager is started.

The process name of pid 20180 is 'flink jobManager'.
The stack trace is blow:





Re: Registering custom metrics does not work

2017-07-06 Thread Chesnay Schepler

Hello,

Plase provide more information as to how it is not working as expected.

Does it throw an exception, log a warning, is the metric
not get registered at all or does the value not changing?

On 06.07.2017 08:10, wyphao.2007 wrote:

Hi, all
I want to know element's latency before write to Elasticsearch, so I 
registering a custom metrics as follow:


class CustomElasticsearchSinkFunction extends 
ElasticsearchSinkFunction[EventEntry] {

  private var metricGroup: Option[MetricGroup] = None
  private var latency: Long = _

  private def init(runtimeContext: RuntimeContext): Unit = {
if (metricGroup.isEmpty) {
  metricGroup = Some(runtimeContext.getMetricGroup)
  metricGroup.get.gauge[Long, Gauge[Long]]("esLatency", 
ScalaGauge[Long](() => latency))

}
  }

  def createIndexRequest(element: EventEntry, runtimeContext: 
RuntimeContext): IndexRequest = {

init(runtimeContext)
latency = System.currentTimeMillis() - element.executeTime.getMillis
Requests.indexRequest.index("test").`type`("event").source(element.json)
  }

  override def process(element: EventEntry,
   runtimeContext: RuntimeContext,
   requestIndexer: RequestIndexer): Unit =
requestIndexer.add(createIndexRequest(element, runtimeContext))
}

but that does not seem to work, Does anyone know why?

Regards
wyp





Re: Registering custom metrics does not work

2017-07-06 Thread Chesnay Schepler

How are you verifying whether it is registered?

For the sake of covering all angles: Are you certain that 
createPartitionIndex is called?


On 06.07.2017 08:51, wyphao.2007 wrote:

Hi Chesnay, thank you for your reply

The code above does not get registered at all.



在2017年07月06 14时45分, "Chesnay Schepler"<ches...@apache.org>写道:


Hello,

 Plase provide more information as to how it is not working as
 expected.

 Does it throw an exception, log a warning, is the metric
 not get registered at all or does the value not changing?

 On 06.07.2017 08:10, wyphao.2007 wrote:

Hi, all
I want to know element's latency before write to
 Elasticsearch, so I registering a custom metrics as follow:


class CustomElasticsearchSinkFunction extends
 ElasticsearchSinkFunction[EventEntry] {
  private var metricGroup: Option[MetricGroup] = None
  private var latency: Long = _

  private def init(runtimeContext: RuntimeContext): Unit =  {
if (metricGroup.isEmpty) {
  metricGroup = Some(runtimeContext.getMetricGroup)
  metricGroup.get.gauge[Long, Gauge[Long]]("esLatency",  
   ScalaGauge[Long](() => latency))

}
  }

  def createIndexRequest(element: EventEntry,  runtimeContext:
RuntimeContext): IndexRequest = {
init(runtimeContext)
latency = System.currentTimeMillis() -
 element.executeTime.getMillis
 Requests.indexRequest.index("test").`type`("event").source(element.json)
  }

  override def process(element: EventEntry,
   runtimeContext: RuntimeContext,
   requestIndexer: RequestIndexer):
 Unit =
requestIndexer.add(createIndexRequest(element,
 runtimeContext))

}

but that does not seem to work, Does anyone know why?

Regards
wyp







Re: Invalid path exception

2017-08-01 Thread Chesnay Schepler
One problem i know of is that windows paths with a scheme are not 
detected as windows paths, as documented in FLINK-6889.

They generally still work though (/maybe /by chance).

I just verified that calling FileInputFormat#setFilePath() works for 
both "file:///" and "file:/" on Windows.
(I'm assuming that we're talking about the FileInputFormat, if I'm wrong 
please correct me)


@Mohit Could you provide the full stacktrace or a small self-contained 
example to reproduce the issue?


On 31.07.2017 22:19, Stephan Ewen wrote:

Hmm, looks like a bug then... Could you open a JIRA issue for that?

@Chesnay are you aware of Path issues on Windows?

On Mon, Jul 31, 2017 at 8:01 PM, Mohit Anchlia <mohitanch...@gmail.com 
<mailto:mohitanch...@gmail.com>> wrote:


I tried that as well but same result

format.setFilePath("file:/c:/proj/test/a.txt.txt");


Caused by: _java.nio.file.InvalidPathException_: Illegal char <:>
at index 2: /c:/proj/test/a.txt.txt





On Mon, Jul 31, 2017 at 6:04 AM, Stephan Ewen <se...@apache.org
<mailto:se...@apache.org>> wrote:

I think that on Windows, you need to use "file:/c:/proj/..."
with just one slash after the scheme.



On Mon, Jul 31, 2017 at 1:24 AM, Mohit Anchlia
<mohitanch...@gmail.com <mailto:mohitanch...@gmail.com>> wrote:

This is what I tired and it doesn't work. Is this a bug?

format.setFilePath("file:///c:/proj/test/a.txt.txt");


On Sun, Jul 30, 2017 at 2:10 PM, Chesnay Schepler
<ches...@apache.org <mailto:ches...@apache.org>> wrote:

Did the path by chance start with file://C:/... ?

If so, please try file:///C: ...


On 30.07.2017 22:28, Mohit Anchlia wrote:

I am using flink 1.3.1 and getting this exception. Is
there a workaround?

Caused by: _java.nio.file.InvalidPathException_:
Illegal char <:> at index 2:
/C:/Users/m/default/flink-example/pom.xml

at sun.nio.fs.WindowsPathParser.normalize(Unknown Source)

at sun.nio.fs.WindowsPathParser.parse(Unknown Source)

at sun.nio.fs.WindowsPathParser.parse(Unknown Source)











Re: Invalid path exception

2017-08-01 Thread Chesnay Schepler

Let's move the discussions to FLINK-7330.

On 01.08.2017 13:15, Chesnay Schepler wrote:
One problem i know of is that windows paths with a scheme are not 
detected as windows paths, as documented in FLINK-6889.

They generally still work though (/maybe /by chance).

I just verified that calling FileInputFormat#setFilePath() works for 
both "file:///" and "file:/" on Windows.
(I'm assuming that we're talking about the FileInputFormat, if I'm 
wrong please correct me)


@Mohit Could you provide the full stacktrace or a small self-contained 
example to reproduce the issue?


On 31.07.2017 22:19, Stephan Ewen wrote:

Hmm, looks like a bug then... Could you open a JIRA issue for that?

@Chesnay are you aware of Path issues on Windows?

On Mon, Jul 31, 2017 at 8:01 PM, Mohit Anchlia 
<mohitanch...@gmail.com <mailto:mohitanch...@gmail.com>> wrote:


I tried that as well but same result

format.setFilePath("file:/c:/proj/test/a.txt.txt");


Caused by: _java.nio.file.InvalidPathException_: Illegal char <:>
at index 2: /c:/proj/test/a.txt.txt





On Mon, Jul 31, 2017 at 6:04 AM, Stephan Ewen <se...@apache.org
<mailto:se...@apache.org>> wrote:

I think that on Windows, you need to use "file:/c:/proj/..."
with just one slash after the scheme.



On Mon, Jul 31, 2017 at 1:24 AM, Mohit Anchlia
<mohitanch...@gmail.com <mailto:mohitanch...@gmail.com>> wrote:

This is what I tired and it doesn't work. Is this a bug?

format.setFilePath("file:///c:/proj/test/a.txt.txt");


On Sun, Jul 30, 2017 at 2:10 PM, Chesnay Schepler
<ches...@apache.org <mailto:ches...@apache.org>> wrote:

Did the path by chance start with file://C:/... ?

If so, please try file:///C: ...


On 30.07.2017 22:28, Mohit Anchlia wrote:

I am using flink 1.3.1 and getting this exception.
Is there a workaround?

Caused by: _java.nio.file.InvalidPathException_:
Illegal char <:> at index 2:
/C:/Users/m/default/flink-example/pom.xml

at sun.nio.fs.WindowsPathParser.normalize(Unknown
Source)

at sun.nio.fs.WindowsPathParser.parse(Unknown Source)

at sun.nio.fs.WindowsPathParser.parse(Unknown Source)













Re: a lot of connections in state "CLOSE_WAIT"

2017-08-08 Thread Chesnay Schepler

FLINK-7368 may be the reason for this behavior.

On 31.07.2017 03:54, XiangWei Huang wrote:

1. yes and yes.
2. Yes,it was shown correctly.
3.I wasn’t modify this setting.

在 2017年7月26日,18:06,Chesnay Schepler [via Apache Flink User Mailing 
List archive.] <[hidden email] 
> 写道:


So this/only/happens when you select a metric? Without a selected 
metric everything works fine?


Are the metrics you selected shown correctly?

Did you modify the "jobmanager.web.refresh-interval" setting? (maybe 
check the flink-conf-yaml for the current setting)


On 26.07.2017 04:57, XiangWei Huang wrote:

hi,

The browser i am using is Google Chrome  with version 59.0.3071.115 and the 
issue persists when i tried Firefox.

Regards,
XiangWei

在 2017年7月25日,17:48,Chesnay Schepler [hidden email] 写道:

Hello,

Could you tell us which browser you are using, including the version?
(and maybe try out if the issue persists with a different one)

Regards,
Chesnay

On 25.07.2017 05:20, XiangWei Huang wrote:

hi,

Sorry for replying so late.
I have met this issue again and the list is constantly keep growing even if
i close the page ,until the website is been unavailable.

This issue appeared each time i add  metrics for a job from web ui.

by the way ,the version of Flink is 1.3.1



Regards,
XiangWei




--
View this message in 
context:http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/a-lot-of-connections-in-state-CLOSE-WAIT-tp14046p14422.html
Sent from the Apache Flink User Mailing List archive. mailing list archive 
atNabble.com <http://Nabble.com>.






If you reply to this email, your message will be added to the 
discussion below:

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/a-lot-of-connections-in-state-CLOSE-WAIT-tp14046p14463.html
To unsubscribe from a lot of connections in state "CLOSE_WAIT",click 
here.
NAML 
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>




View this message in context: Re: a lot of connections in state 
"CLOSE_WAIT" 
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/a-lot-of-connections-in-state-CLOSE-WAIT-tp14046p14539.html>
Sent from the Apache Flink User Mailing List archive. mailing list 
archive 
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> 
at Nabble.com.





Re: JMX stats reporter with all task manager/job manager stats aggregated?

2017-08-07 Thread Chesnay Schepler

Hello,

there is no central place where JMX metrics are aggregated.

You can configure a port range for the reporter to prevent port 
conflicts on the same machine.


metrics.reporter.jmx.port:8789-8790

You can find out which port was used by checking the logs.

Regards,
Chesnay

On 05.08.2017 03:06, Ajay Tripathy wrote:
Sorry: neglected to include the stack trace for JMX failing to 
instantiate from a taskmanager:


017-08-05 00:59:09,388 INFO  org.apache.flink.runtime.metrics.MetricRegistry
   - Configuring JMXReporter with {port=8789, 
class=org.apache.flink.metrics.jmx.JMXReporter}.
2017-08-05 00:59:09,402 ERROR org.apache.flink.runtime.metrics.MetricRegistry   
- Could not instantiate metrics reporter jmx. Metrics might not be 
exposed/reported.
java.lang.RuntimeException: Could not start JMX server on any configured port. 
Ports: 8789
at org.apache.flink.metrics.jmx.JMXReporter.open(JMXReporter.java:127)
at 
org.apache.flink.runtime.metrics.MetricRegistry.(MetricRegistry.java:120)
at 
org.apache.flink.runtime.taskmanager.TaskManager$.createTaskManagerComponents(TaskManager.scala:2114)
at 
org.apache.flink.runtime.taskmanager.TaskManager$.startTaskManagerComponentsAndActor(TaskManager.scala:1873)
at 
org.apache.flink.runtime.taskmanager.TaskManager$.runTaskManager(TaskManager.scala:1769)
at 
org.apache.flink.runtime.taskmanager.TaskManager$.selectNetworkInterfaceAndRunTaskManager(TaskManager.scala:1637)
at 
org.apache.flink.runtime.taskmanager.TaskManager.selectNetworkInterfaceAndRunTaskManager(TaskManager.scala)
at 
org.apache.flink.yarn.YarnTaskManagerRunner$1.call(YarnTaskManagerRunner.java:146)
at 
org.apache.flink.yarn.YarnTaskManagerRunner$1.call(YarnTaskManagerRunner.java:142)
at 
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at 
org.apache.flink.yarn.YarnTaskManagerRunner.runYarnTaskManager(YarnTaskManagerRunner.java:142)
at org.apache.flink.yarn.YarnTaskManager$.main(YarnTaskManager.scala:64)
at org.apache.flink.yarn.YarnTaskManager.main(YarnTaskManager.scala)

On Fri, Aug 4, 2017 at 3:51 PM, Ajay Tripathy > wrote:


Hi, I'm running flink jobmanagers/taskmanagers with yarn. I've
turned on the JMX reporter in my flink-conf.yaml as follows:

metrics.reporters:jmx

metrics.reporter.jmx.class:org.apache.flink.metrics.jmx.JMXReporter


I was wondering:

Is there a JMX server with the aggregated stats across all jobs /
tasks? If so, where is it located? It appears that a JMX starts
for every single taskmanager and the jobmanagers do not have the
data reported from the taskmanagers.


I'm not sure if this is related, but when I try to specify a port
for the jmx reporter, like this:

metrics.reporter.jmx.port: 8789

I'm receiving an error where JMX servers from different task
managers fight for that port, and fail to start.






Re: Problem: Please check that all IDs specified via `uid(String)` are unique.

2017-05-03 Thread Chesnay Schepler

Is the method called multiple times with different input sets?

Does the issue occur regardless of which uid you set?

On 03.05.2017 20:21, Rami Al-Isawi wrote:

Hi,

Nope, that is the only uid in the whole project. The 
“userRawDataStream" comes from a splitter:

public static class Splitterimplements OutputSelector{
 @Override public Iterable select(EventRaw value) {
 List output =new ArrayList<>();
 output.add(value.type);
 return output;
 }
}
Any ideas how to get better debug messages?
Regards,
-Rami

On 3 May 2017, at 14:12, Chesnay Schepler <ches...@apache.org 
<mailto:ches...@apache.org>> wrote:


Hello,

was a uid set on "userRawDataStream", or any of it's parent 
transformations?


On 03.05.2017 12:59, Rami Al-Isawi wrote:

Hi,

I am trying to set uids. I keep getting this (Flink.1.2):

Exception in thread "main" java.lang.IllegalArgumentException: Hash 
collision on user-specified ID. Most likely cause is a non-unique 
ID. Please check that all IDs specified via `uid(String)` are unique.


Here is the code snippet.
public DataStream processUserActions(DataStream 
userRawDataStream, Time duration, Sonar.ReportSpan span) {
 return userRawDataStream.map(userRaw->new 
UserActionValue(userRaw)).startNewChain().uid("11")
  
.keyBy("userRaw.eventRaw.env_key","userRaw.eventRaw.tag","userRaw.eventRaw."+span.name(),"userRaw.type")
 .window(GlobalWindows.create())
 .trigger(new TimedTrigger(duration))
 .sum("count")//.startNewChain().uid("") .map(userActionsCount->new 
UserAction(userActionsCount,span));//.startNewChain().uid(""); }
I made sure that they are unique and even with one uid, the error is there. I 
guess there is an easy fix, but I cannot see it.
Regards,
-Rami
Disclaimer: This message and any attachments thereto are intended 
solely for the addressed recipient(s) and may contain confidential 
information. If you are not the intended recipient, please notify 
the sender by reply e-mail and delete the e-mail (including any 
attachments thereto) without producing, distributing or retaining 
any copies thereof. Any review, dissemination or other use of, or 
taking of any action in reliance upon, this information by persons 
or entities other than the intended recipient(s) is prohibited. 
Thank you. 





Disclaimer: This message and any attachments thereto are intended 
solely for the addressed recipient(s) and may contain confidential 
information. If you are not the intended recipient, please notify the 
sender by reply e-mail and delete the e-mail (including any 
attachments thereto) without producing, distributing or retaining any 
copies thereof. Any review, dissemination or other use of, or taking 
of any action in reliance upon, this information by persons or 
entities other than the intended recipient(s) is prohibited. Thank you. 





Re: Why not add flink-connectors to flink dist?

2017-05-15 Thread Chesnay Schepler
You can either package the connector into the user-jar or place it in 
the /lib directory of the distribution.


On 15.05.2017 11:09, yunfan123 wrote:

So how can I use it?
Every jar file I submitted should contains the specific connector class?
Can I package it to flink-dist ?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Why-not-add-flink-connectors-to-flink-dist-tp13134.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.





Re: [DISCUSS] Release 1.3.0 RC0 (Non voting, testing release candidate)

2017-05-09 Thread Chesnay Schepler
This RC (and 1.4-SNAPSHOT for that matter) cannot be compiled on Windows 
due to the rat-plugin not detecting certain test savepoints as binary 
files. The files in question are the "savepoints" created by the 
StreamOperatorTestHarness.


This is not a new problem and has happened before, but we need a better 
way to deal with this than me fixing it afterwards whenever it pops up. 
We may want to exclude all files without extensions in the test/resource 
folders, or look into a CI option to verify that the compilation on 
Windows works.


On 09.05.2017 09:43, Stephan Ewen wrote:

@Renjie Liu: A good part of the FLIP-6 code is in there, but not all.

It does run well on Yarn, Mesos, Docker, etc.

We need to finish the FLIP-6 work mainly for full elasticity.


On Tue, May 9, 2017 at 5:24 AM, Renjie Liu > wrote:


Hi, does this include the FLIP6?

On Tue, May 9, 2017 at 2:29 AM Stephan Ewen > wrote:

Did a quick test: Simply adding the

"org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"
helps with NOTICE files,
but does not add the required BSD licence copies.


On Mon, May 8, 2017 at 8:25 PM, Stephan Ewen > wrote:

I did the first pass for the legal check.

  - Source LICENSE and NOTICE are okay

  - In the shaded JAR files, we are not bundling the
license and notice files of the dependencies we include in
the shaded jars.
 => Not a problem for Guava (Apache Licensed)
 => I think is a problem for ASM (redistribution in
binary form, hence needs a notice of the copy)

  - The Table API / SQL module needs more entries for
Janino / Reflections (both BSD licensed)

So that is definitely a blocker.


On Mon, May 8, 2017 at 12:14 PM, Robert Metzger
> wrote:

Hi Devs,

I've created a first non-voting release candidate for
Flink 1.3.0.
Please use this RC to test as much as you can and
provide feedback to the Flink community. The more we
find and fix now, the better Flink 1.3.0 wil be :)

I've CC'ed the user@ mailing list to get more people
to test it. DO NOT USE THIS RELEASE CANDIDATE IN
PRODUCTION.

I will prepare a google document to synchronize the
testing effort a bit more.

Depending on the number of issues we identify, I hope
that we can do the first VOTEing RC early next week.


-

The release commit is
f94c002991dcce9f1104f8e61b43efb2f8247cb4, located
here:
http://git-wip-us.apache.org/repos/asf/flink/commit/f94c0029


The artifacts are located here:
http://people.apache.org/~rmetzger/flink-1.3.0-rc0/


The maven staging repository is here:

https://repository.apache.org/content/repositories/orgapacheflink-1118




-

Happy testing!

Regards,
Robert



-- 
Liu, Renjie

Software Engineer, MVAD






Re: Group already contains a Metric with the name 'latency'

2017-05-16 Thread Chesnay Schepler

No; you can name operators like this:

stream.map().name("MyUniqueMapFunctionName")

On 16.05.2017 14:50, rizhashmi wrote:

thanks for your reply

*latency* metrics appear to be pushed by AbstractStreamOperator.java

latencyGauge = this.metrics.gauge("latency", new LatencyGauge(historySize));

does it mean this methods need to be override?




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Group-already-contains-a-Metric-with-the-name-latency-tp13157p13164.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.





Re: Question about jobmanager.web.upload.dir

2017-05-17 Thread Chesnay Schepler
I don't know why we delete it either, but my guess is that at one point 
this was a temporary directory that we properly cleaned up, and later 
allowed it to be configurable.


Currently, this directory must be in the local file system. We could 
change it to also allow non-local paths, which should be fairly easy to 
do. Add a condition that the directory is only cleaned up when it wasn't 
explicitly configured and we should've covered everything.


In regards to the HistoryServer, i mean of course could we include the 
user-jar, but I'm wondering about the user-cases for it.
It's not like you could "just download it and give it to a JobManager"; 
you would be missing any additional libraries put under /lib, as well as 
the original configuration of the cluster. It would also require extra 
steps (and tools) to identify which dependencies are required.


And there are downsides to this, like a significant increase in the size 
of the job archives (along with a duplication of jars for every job 
submission), and the archiving also becomes more complex.


On 17.05.2017 15:52, Timo Walther wrote:

Hey Mauro,

I'm not aware of any reason for that. I loop in Chesnay, maybe he 
knows why. @Chesnay wouldn't it be helpful to also archive the jars 
using the HistoryServer?


Timo


Am 17.05.17 um 12:31 schrieb Mauro Cortellazzi:

Hi Flink comunity,

is there a particular reason to delete the jobmanager.web.upload.dir 
content when the JobManager restart? Could it be interesting have the 
jar previously uploaded when JM restart? Could the jars be saved to 
HDFS for HA mode?


Thank you for your support.

Mauro








Re: Group already contains a Metric with the name 'latency'

2017-05-16 Thread Chesnay Schepler

Does your job include multiple operators called "Filter"?

On 16.05.2017 13:35, rizhashmi wrote:

I am getting bunch of warning in log files. Anyone help me sort out this
problem.


2017-04-28 00:20:57,947 WARN  org.apache.flink.metrics.MetricGroup
- Name collision: Group already contains a Metric with the name 'latency'.
Metric will not be reported.[vpc2w2-rep-stage-flink2, taskmanager,
c5c676c57eca2c127ba1243394658e8a, Flink Streaming Job,
Timestamps/Watermarks, 4]
2017-04-28 00:20:57,955 WARN  org.apache.flink.metrics.MetricGroup
- Name collision: Group already contains a Metric with the name 'latency'.
Metric will not be reported.[vpc2w2-rep-stage-flink2, taskmanager,
c5c676c57eca2c127ba1243394658e8a, Flink Streaming Job, Filter, 4]
2017-04-28 00:20:57,962 WARN  org.apache.flink.metrics.MetricGroup
- Name collision: Group already contains a Metric with the name 'latency'.
Metric will not be reported.[vpc2w2-rep-stage-flink2, taskmanager,
c5c676c57eca2c127ba1243394658e8a, Flink Streaming Job,
Timestamps/Watermarks, 1]
2017-04-28 00:20:57,968 WARN  org.apache.flink.metrics.MetricGroup
- Name collision: Group already contains a Metric with the name 'latency'.
Metric will not be reported.[vpc2w2-rep-stage-flink2, taskmanager,
c5c676c57eca2c127ba1243394658e8a, Flink Streaming Job, Filter, 1]
2017-04-28 00:20:57,968 WARN  org.apache.flink.metrics.MetricGroup
- Name collision: Group already contains a Metric with the name 'latency'.
Metric will not be reported.[vpc2w2-rep-stage-flink2, taskmanager,
c5c676c57eca2c127ba1243394658e8a, Flink Streaming Job,
Timestamps/Watermarks, 7]
2017-04-28 00:20:57,976 WARN  org.apache.flink.metrics.MetricGroup
- Name collision: Group already contains a Metric with the name 'latency'.
Metric will not be reported.[vpc2w2-rep-stage-flink2, taskmanager,
c5c676c57eca2c127ba1243394658e8a, Flink Streaming Job, Filter, 7]
2017-04-28 00:20:57,969 WARN  org.apache.flink.metrics.MetricGroup
- Name collision: Group already contains a Metric with the name 'latency'.
Metric will not be reported.[vpc2w2-rep-stage-flink2, taskmanager,
c5c676c57eca2c127ba1243394658e8a, Flink Streaming Job,
Timestamps/Watermarks, 10]
2017-04-28 00:20:57,978 WARN  org.apache.flink.metrics.MetricGroup
- Name collision: Group already contains a Metric with the name 'latency'.
Metric will not be reported.[vpc2w2-rep-stage-flink2, taskmanager,
c5c676c57eca2c127ba1243394658e8a, Flink Streaming Job, Filter, 10]



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Group-already-contains-a-Metric-with-the-name-latency-tp13157.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.





Re: Flink-derrived operator names cause issues in Graphite metrics

2017-06-12 Thread Chesnay Schepler

So there's 2 issues here:

1. The default names for windows are horrible. They are to long, full
   of special characters, and unstable as reported in FLINK-6464
   
2. The reporter doesn't filter out metrics it can't report.

For 2) we can do 2 things:

 * If a fully assembled metric name is too long the graphite reporter
   will ignore the metric and log a warning.
 * when converting the operator name to a string, limit the total size.
   Say, 40-60 characters. This may not be enough for your use-case though.

I'll create JIRAs for 2), and try to fix them as soon as possible.

A more comprehensive solution will be made as part of FLINK-6464, which 
includes a clean-up/refactoring of operator names.


On 12.06.2017 14:45, Carst Tankink wrote:

Hi,

We accidentally forgot to give some operators in our flink stream a 
custom/unique name, and ran into the following exception in Graphite:
‘exceptions.IOError: [Errno 36] File name too long: 
'///TriggerWindow_SlidingEventTimeWindows_60_-60__-FoldingStateDescriptor_serializer=org-apache-flink-api-common-typeutils-base-IntSerializer_655523dd_-initialValue=0_-foldFunction=_24e08d59__-EventTimeTrigger___-WindowedStream-fold_AllWindowedStream-java:
 __/0/buffers/inputQueueLength.wsp'

(some placeholders because it might reveal too much about our platform, sorry. 
The actual filename is quite a bit longer).

The problem seems to be that Flink uses toString for the operator if no name is 
set, and the graphite exporter does not sanitize the output for length.
Is this something that should be posted as a bug?  Or a known limitation that 
we missed in the documentation?

Thanks,
Caarst





Re: Custom Gauge Metric is not getting updated on Job Dashboard

2017-06-21 Thread Chesnay Schepler

Can you provide more of your code (you can also send it to me directly)?

I'm interested in where the startTime/endTime arguments are defined.

On 21.06.2017 10:47, sohimankotia wrote:

I ran job and monitored for approx 20 mins .

I tried with meter,accumulators,histogram,gauge .

Out of those only meter and accumulators were updating values, other were
only only showing constant value all the time .

  




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Gauge-Metric-is-not-getting-updated-on-Job-Dashboard-tp13842p13886.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.





Re: Custom Gauge Metric is not getting updated on Job Dashboard

2017-06-21 Thread Chesnay Schepler
The reason why the gauge value is not updating is because you are not 
actually updating the gauge,
but register a new gauge under the same name. The subsequent 
registration are ignored, and should've

logged a warning.

I suggest to make your gauge stateful by adding a field for the 
opTimeInSec with a setter which you call

in addMetricData(...).

On 21.06.2017 11:48, sohimankotia wrote:

Here it is :


import com.codahale.metrics.SlidingWindowReservoir;
import in.dailyhunt.cis.enrichments.datatype.BasicInfoTuple;
import in.dailyhunt.cis.enrichments.datatype.SinkTuple;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
import org.apache.flink.util.Collector;


public class DBFlatMap extends RichFlatMapFunction {

private transient Meter meter;
private transient org.apache.flink.metrics.Histogram histogram;


@Override
public void open(Configuration parameters) throws Exception {

/*
  some app specific code
 */

com.codahale.metrics.Meter meter1 = new 
com.codahale.metrics.Meter();
this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new 
DropwizardMeterWrapper(meter1));

com.codahale.metrics.Histogram histogram1 =
new com.codahale.metrics.Histogram(new 
SlidingWindowReservoir(500));

this.histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new 
DropwizardHistogramWrapper(histogram1));

}

@Override
public void flatMap(BasicInfoTuple in, Collector out) throws
Exception {
long start = System.currentTimeMillis();
incrementCounter("input-in-group-and-KeyElect-Flow",
this.getRuntimeContext());
this.dbOperations();
addMetricData(start, System.currentTimeMillis());
this.meter.markEvent();
}

private void dbOperations() {
// Db Operation and some app logic
}

public void incrementCounter(String counterName, RuntimeContext
runtimeContext) {
if (runtimeContext == null) {
return;
}

LongCounter lc = runtimeContext.getLongCounter(counterName);
if (lc == null) {
lc = new LongCounter();
runtimeContext.addAccumulator(counterName, lc);
}
lc.add(1L);
}


private void addMetricData(long startTime, long endTime) {
final long opTimeInSec = (endTime - startTime) / 1000;
this.histogram.update(opTimeInSec);
getRuntimeContext().getMetricGroup()
.gauge("DbOpGauge", (Gauge) () -> 
String.valueOf(opTimeInSec));

}


}




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Gauge-Metric-is-not-getting-updated-on-Job-Dashboard-tp13842p13888.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.





Re: Custom Gauge Metric is not getting updated on Job Dashboard

2017-06-21 Thread Chesnay Schepler
Exactly, you register the gauge once in open(), and modify the code so 
that this gauge returns

different values.

On 21.06.2017 12:04, sohimankotia wrote:

Basically Every time I am calling add metric method it is just registering
the gauge .

I can register this gauge in open method and then in flatmap update the
value of gauge .

Right ?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Gauge-Metric-is-not-getting-updated-on-Job-Dashboard-tp13842p13891.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.





Re: Partitioner is spending around 2 to 4 minutes while pushing data to next operator

2017-06-22 Thread Chesnay Schepler

So let's get the obvious question out of the way:

Why are you adding a partitioner when your parallelism is 1?

On 22.06.2017 11:58, sohimankotia wrote:

I have a execution flow (Streaming Job) with parallelism 1.

  source -> map -> partitioner -> flatmap -> sink

Since adding partitioner will start new thread but partitioner is spending
average of 2 to 4 minutes while moving data from map to flat map .

For more details about this  :
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Using-Custom-Partitioner-in-Streaming-with-parallelism-1-adding-latency-td13766.html

In some link here :
https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks

they have mentioned that the

  PipelinedSubpartition is a pipelined implementation to support streaming
data exchange. The SpillableSubpartition is a blocking implementation to
support batch data exchange.

I am not sure how would i use these or reduce latency from map ->
partitioner -> flatmap .






--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Partitioner-is-spending-around-2-to-4-minutes-while-pushing-data-to-next-operator-tp13913.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.





Re: Quick Question...

2017-06-22 Thread Chesnay Schepler

Hello,

in the DataSet API you can do this when specifying your transformations, 
something

along the lines of dataset.map(..).withConfiguration.

In the DataStream API you cannot set the Configuration at all.

Note that in both APIs you can also just pass the Configuration into the 
constructor and

store it in a field.

Regards,
Chesnay

On 22.06.2017 19:56, Steve Jerman wrote:

Hi,

I have a quick question…

How do I set the Configuration passed into RichFunction.open?

I *thought* that setting GlobalJobParameters would do it ...

env.getConfig().setGlobalJobParameters(jobParameters);

But it seems not…

Steve





Re: MapR libraries shading issue

2017-06-26 Thread Chesnay Schepler
This looks more like a certification problem as described here: 
https://github.com/square/okhttp/issues/2746


I don't think that shading could have anything to do with this.

On 26.06.2017 00:09, ani.desh1512 wrote:

I am trying to use Flink (1.3.0) with MapR(5.2.1). Accordingly, I built Flink
for Mapr as follows with maven 3.1:

/mvn clean install -DskipTests -Dscala.version=2.10.6 -Pvendor-repos,mapr
-Dhadoop.version=2.7.0-mapr-1703 -Dzookeeper.version=3.4.5-mapr-1604/

I, then added /opt/mapr/lib/* to Flink classpath, added Datadog metrics
entry to config and to test the config, started flink service via:
/./bin/jobmanager.sh start local/.
In the jobmanager logs, I see the following error:

/ERROR org.apache.flink.runtime.metrics.MetricRegistry   - Could
not instantiate metrics reporter dghttp. Metrics might not be
exposed/reported.
java.lang.IllegalStateException: Failed contacting Datadog to validate API
key
 at
org.apache.flink.metrics.datadog.DatadogHttpClient.validateApiKey(DatadogHttpClient.java:73)
 at
org.apache.flink.metrics.datadog.DatadogHttpClient.(DatadogHttpClient.java:61)
 at
org.apache.flink.metrics.datadog.DatadogHttpReporter.open(DatadogHttpReporter.java:104)
 at
org.apache.flink.runtime.metrics.MetricRegistry.(MetricRegistry.java:129)
 at
org.apache.flink.runtime.taskexecutor.TaskManagerServices.fromConfiguration(TaskManagerServices.java:188)
 at
org.apache.flink.runtime.taskmanager.TaskManager$.startTaskManagerComponentsAndActor(TaskManager.scala:1921)
 at
org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2322)
 at
org.apache.flink.runtime.jobmanager.JobManager$.liftedTree3$1(JobManager.scala:2053)
 at
org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2052)
 at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply$mcV$sp(JobManager.scala:2139)
 at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2117)
 at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2117)
 at scala.util.Try$.apply(Try.scala:161)
 at
org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2172)
 at
org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2117)
 at
org.apache.flink.runtime.jobmanager.JobManager$$anon$10.call(JobManager.scala:1992)
 at
org.apache.flink.runtime.jobmanager.JobManager$$anon$10.call(JobManager.scala:1990)
 at
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1595)
 at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
 at
org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1990)
 at
org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)
Caused by: javax.net.ssl.SSLHandshakeException:
sun.security.validator.ValidatorException: PKIX path building failed:
sun.security.provider.certpath.SunCertPathBuilderException: unable to find
valid certi
fication path to requested target
 at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
 at sun.security.ssl.SSLSocketImpl.fatal(SSLSocketImpl.java:1949)
 at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:302)
 at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296)
 at
sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1514)
 at
sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216)
 at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026)
 at sun.security.ssl.Handshaker.process_record(Handshaker.java:961)
 at
sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1062)
 at
sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1375)
 at
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1403)
 at
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1387)
 at
org.apache.flink.shaded.okhttp3.internal.connection.RealConnection.connectTls(RealConnection.java:268)
 at
org.apache.flink.shaded.okhttp3.internal.connection.RealConnection.establishProtocol(RealConnection.java:238)
 at
org.apache.flink.shaded.okhttp3.internal.connection.RealConnection.connect(RealConnection.java:149)
 at
org.apache.flink.shaded.okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:192)
 at

Re: Flink metrics related problems/questions

2017-05-19 Thread Chesnay Schepler

1. This shouldn't happen. Do you access the counter from different threads?

2. Metrics in general are not persisted across restarts, and there is no 
way to configure flink to do so at the moment.


3. Counters are sent as gauges since as far as I know StatsD counters 
are not allowed to be decremented.


On 19.05.2017 08:56, jaxbihani wrote:

Background: We are using a job using ProcessFunction which reads data from
kafka fires ~5-10K timers per second and sends matched events to KafkaSink.
We are collecting metrics for collecting no of active timers, no of timers
scheduled etc. We use statsd reporter and monitor using Grafana dashboard &
RocksDBStateBackend backed by HDFS as state.

Observations/Problems:
1. *Counter value suddenly got reset:*  While job was running fine, on one
fine moment, metric of a monotonically increasing counter (Counter where we
just used inc() operation) suddenly became 0 and then resumed from there
onwards. Only exception in the logs were related to transient connectivity
issues to datanodes. Also there was no other indicator of any failure
observed after inspecting system metrics/checkpoint metrics.  It happened
just once across multiple runs of a same job.
2. *Counters not retained during flink restart with savepoint*: Cancelled
job with -s option taking savepoint and then restarted the job using the
savepoint.  After restart metrics started from 0. I was expecting metric
value of a given operator would also be part of state.
3. *Counter metrics getting sent as Gauge*: Using tcpdump I was inspecting
the format in which metric are sent to statsd. I observed that even the
metric which in my code were counters, were sent as gauges. I didn't get why
that was so.

Can anyone please add more insights into why above mentioned behaviors would
have happened?
Also does flink store metric values as a part of state for stateful
operators? Is there any way to configure that?




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-metrics-related-problems-questions-tp13218.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.





Re: Flink metrics related problems/questions

2017-05-19 Thread Chesnay Schepler
2. isn't quite accurate actually; metrics on the TaskManager are not 
persisted across restarts.


On 19.05.2017 11:21, Chesnay Schepler wrote:
1. This shouldn't happen. Do you access the counter from different 
threads?


2. Metrics in general are not persisted across restarts, and there is 
no way to configure flink to do so at the moment.


3. Counters are sent as gauges since as far as I know StatsD counters 
are not allowed to be decremented.


On 19.05.2017 08:56, jaxbihani wrote:
Background: We are using a job using ProcessFunction which reads data 
from
kafka fires ~5-10K timers per second and sends matched events to 
KafkaSink.
We are collecting metrics for collecting no of active timers, no of 
timers
scheduled etc. We use statsd reporter and monitor using Grafana 
dashboard &

RocksDBStateBackend backed by HDFS as state.

Observations/Problems:
1. *Counter value suddenly got reset:*  While job was running fine, 
on one
fine moment, metric of a monotonically increasing counter (Counter 
where we

just used inc() operation) suddenly became 0 and then resumed from there
onwards. Only exception in the logs were related to transient 
connectivity

issues to datanodes. Also there was no other indicator of any failure
observed after inspecting system metrics/checkpoint metrics.  It 
happened

just once across multiple runs of a same job.
2. *Counters not retained during flink restart with savepoint*: 
Cancelled

job with -s option taking savepoint and then restarted the job using the
savepoint.  After restart metrics started from 0. I was expecting metric
value of a given operator would also be part of state.
3. *Counter metrics getting sent as Gauge*: Using tcpdump I was 
inspecting

the format in which metric are sent to statsd. I observed that even the
metric which in my code were counters, were sent as gauges. I didn't 
get why

that was so.

Can anyone please add more insights into why above mentioned 
behaviors would

have happened?
Also does flink store metric values as a part of state for stateful
operators? Is there any way to configure that?




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-metrics-related-problems-questions-tp13218.html
Sent from the Apache Flink User Mailing List archive. mailing list 
archive at Nabble.com.









Re: Flink metrics related problems/questions

2017-05-22 Thread Chesnay Schepler

Yes, that could cause the observed issue.

The default implementations are not thread-safe; if you do concurrent 
writes they may be lost/overwritten.
You will have to either guard accesses to that metric with a 
synchronized block or implement your own thread-safe counter.


On 22.05.2017 14:17, Aljoscha Krettek wrote:

@Chesnay With timers it will happen that onTimer() is called from a different 
Thread than the Tread that is calling processElement(). If Metrics updates 
happen in both, would that be a problem?


On 19. May 2017, at 11:57, Chesnay Schepler <ches...@apache.org> wrote:

2. isn't quite accurate actually; metrics on the TaskManager are not persisted 
across restarts.

On 19.05.2017 11:21, Chesnay Schepler wrote:

1. This shouldn't happen. Do you access the counter from different threads?

2. Metrics in general are not persisted across restarts, and there is no way to 
configure flink to do so at the moment.

3. Counters are sent as gauges since as far as I know StatsD counters are not 
allowed to be decremented.

On 19.05.2017 08:56, jaxbihani wrote:

Background: We are using a job using ProcessFunction which reads data from
kafka fires ~5-10K timers per second and sends matched events to KafkaSink.
We are collecting metrics for collecting no of active timers, no of timers
scheduled etc. We use statsd reporter and monitor using Grafana dashboard &
RocksDBStateBackend backed by HDFS as state.

Observations/Problems:
1. *Counter value suddenly got reset:*  While job was running fine, on one
fine moment, metric of a monotonically increasing counter (Counter where we
just used inc() operation) suddenly became 0 and then resumed from there
onwards. Only exception in the logs were related to transient connectivity
issues to datanodes. Also there was no other indicator of any failure
observed after inspecting system metrics/checkpoint metrics.  It happened
just once across multiple runs of a same job.
2. *Counters not retained during flink restart with savepoint*: Cancelled
job with -s option taking savepoint and then restarted the job using the
savepoint.  After restart metrics started from 0. I was expecting metric
value of a given operator would also be part of state.
3. *Counter metrics getting sent as Gauge*: Using tcpdump I was inspecting
the format in which metric are sent to statsd. I observed that even the
metric which in my code were counters, were sent as gauges. I didn't get why
that was so.

Can anyone please add more insights into why above mentioned behaviors would
have happened?
Also does flink store metric values as a part of state for stateful
operators? Is there any way to configure that?




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-metrics-related-problems-questions-tp13218.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.









Re: Collapsible job plan visualization

2017-05-25 Thread Chesnay Schepler
You should be able to move the separator between the plan view and the 
bottom panel already.


On 25.05.2017 19:45, Flavio Pompermaier wrote:

Hi to all,
In our experience the Flink plan diagram is a nice feature but it is 
useless almost all the time and it has an annoying interaction with 
the mouse wheelI suggest to make it a collapsible div. IMHO that 
would be an easy thing that would definitively improve the user 
experience ...what other flinker think about this??


Best,
Flavio





Re: Group already contains a Metric with the name 'latency'

2017-05-16 Thread Chesnay Schepler
Generally this isn't an issue; it only means that for some operators the 
latency metrics will not be available. The underlying issue is that the 
metric system has no way to differentiate operators except by their 
name; if the names are identical you end up with a collision.


If you're not using the latency metrics there's nothing to worry about. 
To prevent this warning from occurring you will have to give every 
operator a unique name.


On 16.05.2017 14:17, rizhashmi wrote:

  i think i do have.

In my implementation i am generating rollup for multiple timezone. So i took
path of creating windows per timezone,  for each window separate instance of
trigger created window with a new instance AssignerWithPunctuatedWatermarks
& each time i applied same filter.

Does this imply issue?




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Group-already-contains-a-Metric-with-the-name-latency-tp13157p13161.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.





[DISCUSS] Removal of twitter-inputformat

2017-06-07 Thread Chesnay Schepler

Hello,

I'm proposing to remove the Twitter-InputFormat in FLINK-6710 
, with an open PR you 
can find here .
The PR currently has a +1 from Robert, but Timo raised some concerns 
saying that it is useful for prototyping and

advised me to start a discussion on the ML.

This format is a DelimitedInputFormat that reads JSON objects and turns 
them into a custom tweet class.
I believe this format doesn't provide much value to Flink; there's 
nothing interesting about it as an InputFormat,
as it is purely an exercise in /manually /converting a JSON object into 
a POJO.
This is apparent since you could just as well use 
ExecutionEnvironment#readTextFile(...) and throw the parsing logic

into a subsequent MapFunction.

In the PR i suggested to replace this with a JsonInputFormat, but this 
was a misguided attempt at getting Timo to agree
to the removal. This format has the same problem outlined above, as it 
could be effectively implemented with a one-liner map function.


So the question now is whether we want to keep it, remove it, or replace 
it with something more general.


Regards,
Chesnay


Re: Cassandra connector POJO - tombstone question

2017-06-01 Thread Chesnay Schepler

No, unfortunately I forgot about them :/

On 01.06.2017 19:39, Tarandeep Singh wrote:

Hi Chesnay,

Did your code changes (exposing mapper options) made it in 1.3 release?

Thank you,
Tarandeep

On Wed, Apr 12, 2017 at 2:34 PM, Tarandeep Singh <tarand...@gmail.com 
<mailto:tarand...@gmail.com>> wrote:


Thanks Chesnay, this will work.

Best,
Tarandeep

On Wed, Apr 12, 2017 at 2:42 AM, Chesnay Schepler
<ches...@apache.org <mailto:ches...@apache.org>> wrote:

Hello,

what i can do is add hook like we do for the ClusterBuilder
with which you can provide a set of options that will
be used for every call to the mapper. This would provide you
access with all options that are listed on the page
you linked.

You can find an implementation of this here:
https://github.com/zentol/flink/tree/unknown_cass_options
<https://github.com/zentol/flink/tree/unknown_cass_options>

Note that this branch is on 1.3-SNAPSHOT, but it should be
possible for you to cherry-pick it onto a 1.2 branch.

I will add a ticket for this soon (currently getting timeouts
in JIRA).

Regards,
Chesnay


On 12.04.2017 02:27, Tarandeep Singh wrote:

Hi,

I am using flink-1.2 and Cassandra connector to write to
cassandra tables. I am using POJOs with DataStax
annotations as described here-

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/cassandra.html

<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/cassandra.html>

My question is- how are nulls handles by cassandra sink?

Datastax documentation on Mapper states that if we are
using POJOs to store data in Cassandra table and the POJO
has null fields, then it can create tombstones, so one
should use saveNullFields(false) so that null fields are
not persisted -

https://docs.datastax.com/en/developer/java-driver/3.1/manual/object_mapper/using/#mapper-options

<https://docs.datastax.com/en/developer/java-driver/3.1/manual/object_mapper/using/#mapper-options>

Default behavior is to persist null fields.

In cassandra pojo sink code, I don't see this option set
on Mapper-

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java

<https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java>

So does this mean, I can expect to see tombstones when
writing data (assuming my POJOs have null fields). If yes,
can we expose an option to disable saving null fields.

Thanks,
Tarandeep








Re: Streaming job monitoring

2017-06-08 Thread Chesnay Schepler

Hello Flavio,

I'm not sure what source you are using, but it looks like the 
ContinouosFileMonitoringSource which works with 2 operators.
The first operator (what is displayed as the actual Source) emits input 
splits (chunks of files that should be read) and passes

these to the second operator (split reader).

So the numRecordsOut of the source is the number of splits created.

For sinks, the numRecordsOut counter is essentially unused; mostly 
because it is kind of redundant as it should in general
be equal to the numRecordsIn counter. The same applies to the 
numRecordsIn counter of sources.
(although in this particular case it would be interesting to know how 
many files the source has read...)


This is something we would have to solve for each source/sink 
individually, which is kind of tricky as the numRecordsIn/-Out
metrics are internal metrics and not accessible in user-defined 
functions without casting.


In your case the reading of the chunks and writing by the sink is done 
in a single task. The webUI is not aware of operators

and thus can't display the individual metrics nicely.

The metrics tab doesn't aggregate metrics across subtasks, so I can see 
how that would be cumbersome to use. We can't solve
aggregation in general as when dealing with Gauges we just don't know 
whether we can aggregate them at all.
Frankly, this is also something I don't really won't to implement in the 
first place as there are dedicated systems for this
exact use-case. The WebFrontend is not meant as a replacement for these 
systems.


In general i would recommend to setup a dedicated metrics system like 
graphite/ganglia to store metrics and use grafana

or something similar to actually monitor them.

Regards,
Chesnay

On 08.06.2017 11:43, Flavio Pompermaier wrote:

Hi to all,
we've successfully ran our first straming job on a Flink cluster (with 
some problems with the shading of guava..) and it really outperforms 
Logstash, from the point of view of indexing speed and easiness of use.


However there's only one problem: when the job is running, in the Job 
Monitoring UI, I see 2 blocks within the plan visualizer:


 1. Source: Custom File Source (without any info about the file I'm
reading)
 2. Split Reader: Custom File source -> Sink: unnamed

None of them helps me to understand which data I'm reading or writing 
(while within the batch jobs this is usually displayed). Moreover, in 
the task details the "Byte sent/Records sent" are totally senseless, I 
don't know what is counted (see the attached image if available)...I 
see documents indexed on ES but in the Flink Job UI I don't see 
anything that could help to understand how many documents are sent to 
ES or from one function (Source) to the other (Sink).
I tried to display some metrics and there I found something but I hope 
this is not the usual way of monitoring streaming jobs...am I doing 
something wrong? Or the streaming jobs should be monitored with 
something else?


Inline image 1
Best,
Flavio





Re: Can't get keyed messages from Kafka

2017-06-13 Thread Chesnay Schepler
You have to create your own implementation that deserializes the byte 
arrays into whatever type you want to use.


On 13.06.2017 13:19, AndreaKinn wrote:

But KeyedDeserializationSchema has just 2 implementations:

TypeInformationKeyValueSerializationSchema
JSONKeyValueDeserializationSchema


The first give me this error:

06/12/2017 02:09:12 Source: Custom Source(4/4) switched to FAILED
java.io.EOFException at
org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:306)

while the JSONObject obviously doesn't fit my needs.

I thinking about to implement a custom deserialiser but honestly I'm a
newbie and I don't know how to start.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-get-keyed-messages-from-Kafka-tp13687p13689.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.





Re: Task and Operator Metrics in Flink 1.3

2017-06-13 Thread Chesnay Schepler

The scopes look OK to me.

Let's try to narrow down the problem areas a bit:

1. Did this work with the same setup before 1.3?
2. Are all task/operator metrics available in the metrics tab of the
   dashboard?
3. Are there any warnings in the TaskManager logs from the
   MetricRegistry or StatsDReporter?

My *guess *would be that the operator/task metrics contain characters 
that either StatsD or telegraf don't allow,

which causes them to be dropped.

On 12.06.2017 20:32, Dail, Christopher wrote:


I’m using the Flink 1.3.0 release and am not seeing all of the metrics 
that I would expect to see. I have flink configured to write out 
metrics via statsd and I am consuming this with telegraf. Initially I 
thought this was an issue with telegraf parsing the data generated. I 
dumped all of the metrics going into telegraf using tcpdump and found 
that there was a bunch of data missing that I expect.


I’m using this as a reference for what metrics I expect:

https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html

I see all of the JobManager and TaskManager level metrics. Things like 
Status.JVM.* are coming through. TaskManager Status.Network are there 
(but not Task level buffers). The ‘Cluster’ metrics are there.


This IO section contains task and operator level metrics (like what is 
available on the dashboard). I’m not seeing any of these metrics 
coming through when using statsd.


I’m configuring flink with this configuration:

metrics.reporters: statsd

metrics.reporter.statsd.class: 
org.apache.flink.metrics.statsd.StatsDReporter


metrics.reporter.statsd.host: hostname

metrics.reporter.statsd.port: 8125

# Customized Scopes

metrics.scope.jm: flink.jm

metrics.scope.jm.job: flink.jm.

metrics.scope.tm: flink.tm.

metrics.scope.tm.job: flink.tm..

metrics.scope.task: 
flink.tm


metrics.scope.operator: 
flink.tm


I have tried with and without specifically setting the metrics.scope 
values.


Is anyone else having similar issues with metrics in 1.3?

Thanks

*Chris Dail*

Director, Software Engineering

*Dell EMC* | Infrastructure Solutions Group

mobile +1 506 863 4675

christopher.d...@dell.com 





Re: Task and Operator Metrics in Flink 1.3

2017-06-13 Thread Chesnay Schepler

Both your suggestions sound good, would be great to create JIRAs for them.

Could you replace the task scope format with the one below and try again?

metrics.scope.task: flink.tm

This scope doesn't contain any special characters, except the periods.
If you receive task metrics with this scope there are some other special 
characters we need to filter out.


Filtering characters in the StatsDReporter is always a bit icky though, 
since it supports many storage
backends with different requirements. The last-resort would be to filter 
out /all /special characters.


On 13.06.2017 13:41, Dail, Christopher wrote:


Responses to your questions:

 1. Did this work with the same setup before 1.3?

I have not tested it with another version. I started working on the 
metrics stuff with a snapshot of 1.3 and move to the release.


 2. Are all task/operator metrics available in the metrics tab of the
dashboard?

Yes, the metrics are seen from the dashboard.

 3. Are there any warnings in the TaskManager logs from the
MetricRegistry or StatsDReporter?

No, I am not seeing any errors in the logs related to metrics.

> My *guess *would be that the operator/task metrics contain characters 
that either StatsD or telegraf don't allow,

which causes them to be dropped.

This was my original thought too. I did find two separate issues with 
the metrics Flink outputs and I was planning on filing JIRA tickets on 
these. They are:


-Flink does not escape spaces. I had a space in the job name which 
messed up the metrics. So I have a workaround for this but it is 
probably something Flink should escape.


-Flink is outputting a float value of “n/a” for 
lastCheckpointExternalPath. A guage needs to be a float so Telegraf 
does not like this. It errors on and continues ignoring it though.


Note that even with these accounted for I am still not seeing the 
task/operator metrics. I ran a tcpdump to be sure on exactly what is 
coming through. Searching through that dump, I don’t see any of the 
metrics I was looking for.


I guess a few things to note. This is the application I am running:
https://github.com/chrisdail/pravega-samples/blob/master/flink-examples/src/main/scala/io/pravega/examples/flink/iot/TurbineHeatProcessor.scala

Also, I am running this in DC/OS 1.9 trying to integrate with DC/OS 
metrics.


Thanks

Chris

*From: *Chesnay Schepler <ches...@apache.org>
*Date: *Tuesday, June 13, 2017 at 5:26 AM
*To: *"user@flink.apache.org" <user@flink.apache.org>
*Subject: *Re: Task and Operator Metrics in Flink 1.3

The scopes look OK to me.

Let's try to narrow down the problem areas a bit:

 1. Did this work with the same setup before 1.3?
 2. Are all task/operator metrics available in the metrics tab of the
dashboard?
 3. Are there any warnings in the TaskManager logs from the
MetricRegistry or StatsDReporter?

My *guess *would be that the operator/task metrics contain characters 
that either StatsD or telegraf don't allow,

which causes them to be dropped.

On 12.06.2017 20:32, Dail, Christopher wrote:

I’m using the Flink 1.3.0 release and am not seeing all of the
metrics that I would expect to see. I have flink configured to
write out metrics via statsd and I am consuming this with
telegraf. Initially I thought this was an issue with telegraf
parsing the data generated. I dumped all of the metrics going into
telegraf using tcpdump and found that there was a bunch of data
missing that I expect.

I’m using this as a reference for what metrics I expect:


https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html

I see all of the JobManager and TaskManager level metrics. Things
like Status.JVM.* are coming through. TaskManager Status.Network
are there (but not Task level buffers). The ‘Cluster’ metrics are
there.

This IO section contains task and operator level metrics (like
what is available on the dashboard). I’m not seeing any of these
metrics coming through when using statsd.

I’m configuring flink with this configuration:

metrics.reporters: statsd

metrics.reporter.statsd.class:
org.apache.flink.metrics.statsd.StatsDReporter

metrics.reporter.statsd.host: hostname

metrics.reporter.statsd.port: 8125

# Customized Scopes

metrics.scope.jm: flink.jm

metrics.scope.jm.job: flink.jm.

metrics.scope.tm: flink.tm.

metrics.scope.tm.job: flink.tm..

metrics.scope.task:
flink.tm

metrics.scope.operator:
flink.tm

I have tried with and without specifically setting the
metrics.scope values.

Is anyone else having similar issues with metrics in 1.3?

Thanks

*Chris Dail*

Director, Software Engineering

*Dell EMC* | Infrastructure Solutions Group

mobile +1 506 863 4675

christopher.d...@dell.com <mailto:christopher.d...@dell.com>





Re: RichMapFunction setup method

2017-06-13 Thread Chesnay Schepler

The existing signature for open() is a remnant of the past.

We currently recommend to pass all arguments through the constructor and 
store them in fields.

You can of course also pass a Configuration containing all parameters.

On 13.06.2017 15:46, Mikhail Pryakhin wrote:

Hi all!

A RichMapFunction [1] provides a very handy setup method 
RichFunction#open(org.apache.flink.configuration.Configuration) which 
consumes a Configuration instance as an argument, but this argument 
doesn't bear any configuration parameters because it is always passed 
to the method as a new instance. [2] depicts the problem.


Is there any way to pass configuration parameters to the 
 RichFunction#open method via the Configuration parameter? Or is it a bug?

P.S. I'm using flink 1.3

Thanks in advance!

[1] 
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java
[2] 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java#L111


Kind Regards,
Mike Pryakhin













Re: RichMapFunction setup method

2017-06-13 Thread Chesnay Schepler

I'm not aware of any plans to replace it.

For the Batch API it also works properly, so deprecating it would be 
misleading.


On 13.06.2017 16:04, Mikhail Pryakhin wrote:

Hi Chesnay,
Thanks for the reply,


The existing signature for open() is a remnant of the past.


Should the method be deprecated then so that it doesn’t confuse users?

Kind Regards,
Mike Pryakhin


On 13 Jun 2017, at 16:54, Chesnay Schepler <ches...@apache.org 
<mailto:ches...@apache.org>> wrote:


The existing signature for open() is a remnant of the past.

We currently recommend to pass all arguments through the constructor 
and store them in fields.

You can of course also pass a Configuration containing all parameters.

On 13.06.2017 15:46, Mikhail Pryakhin wrote:

Hi all!

A RichMapFunction [1] provides a very handy setup method 
RichFunction#open(org.apache.flink.configuration.Configuration) 
which consumes a Configuration instance as an argument, but this 
argument doesn't bear any configuration parameters because it is 
always passed to the method as a new instance. [2] depicts the problem.


Is there any way to pass configuration parameters to the 
 RichFunction#open method via the Configuration parameter? Or is it 
a bug?

P.S. I'm using flink 1.3

Thanks in advance!

[1] 
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java
[2] 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java#L111


Kind Regards,
Mike Pryakhin

















Re: Flink Kinesis connector in 1.3.0

2017-06-13 Thread Chesnay Schepler

Here's the relevant JIRA: https://issues.apache.org/jira/browse/FLINK-6812

Apologies if I was unclear, i meant that you could use the 1.3-SNAPSHOT 
version of the kinesis connector, as it is compatible with 1.3.0.
Alternatively you can take the 1.3.0 sources and build the connector 
manually.


As far as I'm aware there is no plan to retroactively release a 1.3.0 
artifact.



I'm not aware of the missing httpclient dependency, pulling in Robert 
who may know something about it.



On 13.06.2017 21:00, Foster, Craig wrote:


So, in addition to the question below, can we be more clear on if 
there is a patch/fix/JIRA available since I have to use 1.3.0?


*From: *"Foster, Craig" <foscr...@amazon.com>
*Date: *Tuesday, June 13, 2017 at 9:27 AM
*To: *Chesnay Schepler <ches...@apache.org>, "user@flink.apache.org" 
<user@flink.apache.org>

*Subject: *Re: Flink Kinesis connector in 1.3.0

Thanks! Does this also explain why commons HttpClient is not included 
in flink-dist-*?


*From: *Chesnay Schepler <ches...@apache.org>
*Date: *Tuesday, June 13, 2017 at 8:53 AM
*To: *"user@flink.apache.org" <user@flink.apache.org>
*Subject: *Re: Flink Kinesis connector in 1.3.0

Something went wrong during the release process which prevented the 
1.3.0 kinesis artifact from being released.


This will be fixed for 1.3.1, in the mean time you can use 
1.3.0-SNAPSHOT instead.


On 13.06.2017 17:48, Foster, Craig wrote:

Hi:

I’m trying to build an application that uses the Flink Kinesis
Connector in 1.3.0. However, I don’t see that resolving anymore.
It resolved with 1.2.x but doesn’t with 1.3.0. Is there something
I need to now do differently than described here?


https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kinesis.html

Thanks,

Craig





Re: Flink Kinesis connector in 1.3.0

2017-06-14 Thread Chesnay Schepler

Did you activate the "include-kinesis" maven profile when building?

On 13.06.2017 22:49, Foster, Craig wrote:


Oh, sorry. I’m not using distributed libraries but trying to build 
from source. So, using Maven 3.2.2 and building the connector doesn’t 
give me a jar for some reason.


*From: *Chesnay Schepler <ches...@apache.org>
*Date: *Tuesday, June 13, 2017 at 1:44 PM
*To: *"Foster, Craig" <foscr...@amazon.com>, "user@flink.apache.org" 
<user@flink.apache.org>, Robert Metzger <rmetz...@apache.org>

*Subject: *Re: Flink Kinesis connector in 1.3.0

Here's the relevant JIRA: 
https://issues.apache.org/jira/browse/FLINK-6812 
<https://issues.apache.org/jira/browse/FLINK-6812>


Apologies if I was unclear, i meant that you could use the 
1.3-SNAPSHOT version of the kinesis connector, as it is compatible 
with 1.3.0.
Alternatively you can take the 1.3.0 sources and build the connector 
manually.


As far as I'm aware there is no plan to retroactively release a 1.3.0 
artifact.



I'm not aware of the missing httpclient dependency, pulling in Robert 
who may know something about it.



On 13.06.2017 21:00, Foster, Craig wrote:

So, in addition to the question below, can we be more clear on if
there is a patch/fix/JIRA available since I have to use 1.3.0?

*From: *"Foster, Craig" <foscr...@amazon.com>
<mailto:foscr...@amazon.com>
*Date: *Tuesday, June 13, 2017 at 9:27 AM
*To: *Chesnay Schepler <ches...@apache.org>
<mailto:ches...@apache.org>, "user@flink.apache.org"
<mailto:user@flink.apache.org> <user@flink.apache.org>
<mailto:user@flink.apache.org>
*Subject: *Re: Flink Kinesis connector in 1.3.0

    Thanks! Does this also explain why commons HttpClient is not
included in flink-dist-*?

*From: *Chesnay Schepler <ches...@apache.org>
<mailto:ches...@apache.org>
*Date: *Tuesday, June 13, 2017 at 8:53 AM
*To: *"user@flink.apache.org" <mailto:user@flink.apache.org>
<user@flink.apache.org> <mailto:user@flink.apache.org>
*Subject: *Re: Flink Kinesis connector in 1.3.0

Something went wrong during the release process which prevented
the 1.3.0 kinesis artifact from being released.

This will be fixed for 1.3.1, in the mean time you can use
1.3.0-SNAPSHOT instead.

On 13.06.2017 17:48, Foster, Craig wrote:

Hi:

I’m trying to build an application that uses the Flink Kinesis
Connector in 1.3.0. However, I don’t see that resolving
anymore. It resolved with 1.2.x but doesn’t with 1.3.0. Is
there something I need to now do differently than described here?


https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kinesis.html

Thanks,

Craig





Re: Flink will delete all jars uploaded when restart jobmanager

2017-06-14 Thread Chesnay Schepler

There's currently no way to prevent this.

On 14.06.2017 07:03, XiangWei Huang wrote:

Hi,
When restart flink jobmanager jars which uploaded by user from web ui will be 
deleted .
Is there anyway to avoid this.





Re: Latency and Throughput

2017-06-16 Thread Chesnay Schepler

Hello,

You don't have to measure anything yourself, since Flink exposes 
throughput/latency metrics as described in the System metrics/latency 
tracking sections of the metrics documentation.


You only have to setup a reporter that fetches these metrics (see the 
reporter section) and calculate sum/averages across the job.


Regards,
Chesnay

On 16.06.2017 14:42, Paolo Cristofanelli wrote:

Hi,
it is my first question that I am asking in this mailing list, so I 
hope you would forgive me if I miss something.
I have started using flink recently, and now I would like to compute 
some statistics, like throughput and latency, for my programs.
I was reading this URL from the documentation ( 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/metrics.html 
), but I do not understand how to use meters and how to measure 
latency from this classes.
I have also asked on stack overflow. ( 
https://stackoverflow.com/questions/44587645/throughput-and-latency-on-apache-flink 
)


Thank for you time,
Best Regards
Paolo





Re: Problem with WebUI

2017-06-11 Thread Chesnay Schepler
This looks like a dependency conflict to me. Try checking whether 
anything you use depends on netty.


On 09.06.2017 17:42, Dawid Wysakowicz wrote:

I had a look into yarn logs and I found such exception:

2017-06-09 17:10:20,922 ERROR
org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler
 - Caught exception
java.lang.AbstractMethodError
at io.netty.util.ReferenceCountUtil.touch(ReferenceCountUtil.java:73)
at

io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:107)
at

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
at

io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
at

io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57)
at

io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20)
at

io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
at

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at

io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
at

org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:105)
at

org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65)
at

io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
at

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at

io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
at

io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
at

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at

io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
at

io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:435)
at

io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
at

io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
at

io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:250)
at

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
at

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at

io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
at

io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
at

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
at

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at

io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
at

io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:651)
at

io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:574)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:488)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:450)
at

io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
at

io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:745)


Any idea how to tackle it?

Z pozdrowieniami! / Cheers!


Dawid Wysakowicz

*Data/Software Engineer*

Skype: dawid_wys | Twitter: @OneMoreCoder




2017-06-09 16:17 GMT+02:00 Dawid Wysakowicz 
>:


Hi,

I am trying to run a flink job on yarn. When I submit the job with
following command

Re: RichMapFunction setup method

2017-06-13 Thread Chesnay Schepler
It /is /a remnant of the past since that method signature originates 
from the Record API,

the predecessor of the current DataSet API.

Even in the DataSet API you can just pass arguments through the constructor.
Feel free to open a JIRA, just make sure it is a subtask of FLINK-3957.

On 13.06.2017 16:40, Mikhail Pryakhin wrote:

Thanks a lot  Chesnay,

In case it works properly in the Batch API, don’t you think that it 
should not be called "remnant of the past“?
Should I create an issue so we don’t forget about it and may be fix it 
in the future, I think I’m not the only one who deals with this method.


Kind Regards,
Mike Pryakhin


On 13 Jun 2017, at 17:20, Chesnay Schepler <ches...@apache.org 
<mailto:ches...@apache.org>> wrote:


I'm not aware of any plans to replace it.

For the Batch API it also works properly, so deprecating it would be 
misleading.


On 13.06.2017 16:04, Mikhail Pryakhin wrote:

Hi Chesnay,
Thanks for the reply,


The existing signature for open() is a remnant of the past.


Should the method be deprecated then so that it doesn’t confuse users?

Kind Regards,
Mike Pryakhin


On 13 Jun 2017, at 16:54, Chesnay Schepler <ches...@apache.org 
<mailto:ches...@apache.org>> wrote:


The existing signature for open() is a remnant of the past.

We currently recommend to pass all arguments through the 
constructor and store them in fields.

You can of course also pass a Configuration containing all parameters.

On 13.06.2017 15:46, Mikhail Pryakhin wrote:

Hi all!

A RichMapFunction [1] provides a very handy setup method 
RichFunction#open(org.apache.flink.configuration.Configuration) 
which consumes a Configuration instance as an argument, but this 
argument doesn't bear any configuration parameters because it is 
always passed to the method as a new instance. [2] depicts the 
problem.


Is there any way to pass configuration parameters to the 
 RichFunction#open method via the Configuration parameter? Or is 
it a bug?

P.S. I'm using flink 1.3

Thanks in advance!

[1] 
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java
[2] 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java#L111


Kind Regards,
Mike Pryakhin





















Re: Flink Kinesis connector in 1.3.0

2017-06-13 Thread Chesnay Schepler
Something went wrong during the release process which prevented the 
1.3.0 kinesis artifact from being released.


This will be fixed for 1.3.1, in the mean time you can use 
1.3.0-SNAPSHOT instead.


On 13.06.2017 17:48, Foster, Craig wrote:


Hi:

I’m trying to build an application that uses the Flink Kinesis 
Connector in 1.3.0. However, I don’t see that resolving anymore. It 
resolved with 1.2.x but doesn’t with 1.3.0. Is there something I need 
to now do differently than described here?


https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kinesis.html

Thanks,

Craig





Re: Can ValueState use generics?

2017-05-08 Thread Chesnay Schepler
If you want to use generics you have to either provide a TypeInformation 
instead of a class or create a class that extends Tuple2(Integer, 
ObjectNode) and use it as the class argument.


On 07.05.2017 15:14, yunfan123 wrote:

My process function is like :

 private static class MergeFunction extends
RichProcessFunction, Tuple2> {

 private ValueState> state;

 @Override
 @SuppressWarnings("unchecked")
 public void open(Configuration parameters) throws Exception {
 state = getRuntimeContext().getState(new
ValueStateDescriptor<>("mystate",
 (Class>)
(Object)Tuple2.class));
 }
}


When I running the code:
05/07/2017 21:17:47 Process -> (Sink: Unnamed, Sink: Unnamed)(1/1) switched
to FAILED
java.lang.RuntimeException: Cannot create full type information based on the
given class. If the type has generics, please
at
org.apache.flink.api.common.state.StateDescriptor.(StateDescriptor.java:124)
at
org.apache.flink.api.common.state.ValueStateDescriptor.(ValueStateDescriptor.java:101)
at
com.bytedance.flinkjob.activationSource.AppActivationSource$MergeFunction.open(AppActivationSource.java:134)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
at
org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:55)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:375)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
Tuple needs to be parameterized by using generics.
at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:673)
at
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:607)
at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:561)
at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:557)
at
org.apache.flink.api.common.state.StateDescriptor.(StateDescriptor.java:122)
... 9 more

Can I use generics with ValueState?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-ValueState-use-generics-tp13038.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.





Re: Collector.collect

2017-05-01 Thread Chesnay Schepler

Oh you have multiple different output formats, missed that.

For the Batch API you are i believe correct, using a custom 
output-format is the best solution.


In the Streaming API the code below should be equally fast, if the 
filtered sets don't overlap.


input = ...
input.filter(conditionA).output(formatA)
input.filter(conditonB).output(formatB)

That is because all filters would be chained; hell all sources might be 
as well (not to sure on this one).


On 01.05.2017 17:05, Newport, Billy wrote:


There is likely a bug then, the ENUM,Record stream to a filter to a 
set of outputformats per filter was slower than the BITMASK,Record to 
single OutputFormat which demux’s the data to each file internally


Are you saying do a custom writer inside a map rather than either of 
the 2 above approaches?


*From:*Chesnay Schepler [mailto:ches...@apache.org]
*Sent:* Monday, May 01, 2017 10:41 AM
*To:* user@flink.apache.org
*Subject:* Re: Collector.collect

Hello,

@Billy, what prevented you from duplicating/splitting the record, 
based on the bitmask, in a map function before the sink?
This shouldn't incur any serialization overhead if the sink is chained 
to the map. The emitted Tuple could also share the

GenericRecord; meaning you don't even have to copy it.

On 01.05.2017 14:52, Newport, Billy wrote:

We’ve done that but it’s very expensive from a serialization point
of view when writing the same record multiple times, each in a
different tuple.

For example, we started with this:

.collect(new Tuple<Short, GenericRecord)).

The record would be written with short = 0 and again with short =
1. This results in the GenericRecord being serialized twice. You
also prolly need filters on the output dataset which is expensive
also.

We switched instead to a bitmask. Now, we write the record once
and set bits in the short for each file the record needs to be
written to. Our next step is to write records to a file based on
the short. We wrote a new outputrecordformat which checks the bits
in the short and writes the GenericRecord to each file for the
corresponding bit. This means no filter to split the records for
each file and this is much faster.

We’re finding a need to do this kind of optimization pretty
frequently with flink.

*From:*Gaurav Khandelwal [mailto:gaurav671...@gmail.com]
*Sent:* Saturday, April 29, 2017 4:32 AM
*To:* user@flink.apache.org <mailto:user@flink.apache.org>
*Subject:* Collector.collect

Hello

I am working on RichProcessFunction and I want to emit multiple
records at a time. To achieve this, I am currently doing :

while(condition)

{

 Collector.collect(new Tuple<>...);

}

I was wondering, is this the correct way or there is any other
alternative.





Re: Collector.collect

2017-05-01 Thread Chesnay Schepler

Hello,

@Billy, what prevented you from duplicating/splitting the record, based 
on the bitmask, in a map function before the sink?
This shouldn't incur any serialization overhead if the sink is chained 
to the map. The emitted Tuple could also share the

GenericRecord; meaning you don't even have to copy it.

On 01.05.2017 14:52, Newport, Billy wrote:


We’ve done that but it’s very expensive from a serialization point of 
view when writing the same record multiple times, each in a different 
tuple.


For example, we started with this:

.collect(new Tuple...);

}

I was wondering, is this the correct way or there is any other 
alternative.






Re: Problem: Please check that all IDs specified via `uid(String)` are unique.

2017-05-03 Thread Chesnay Schepler

Hello,

was a uid set on "userRawDataStream", or any of it's parent transformations?

On 03.05.2017 12:59, Rami Al-Isawi wrote:

Hi,

I am trying to set uids. I keep getting this (Flink.1.2):

Exception in thread "main" java.lang.IllegalArgumentException: Hash 
collision on user-specified ID. Most likely cause is a non-unique ID. 
Please check that all IDs specified via `uid(String)` are unique.


Here is the code snippet.
public DataStream processUserActions(DataStream 
userRawDataStream, Time duration, Sonar.ReportSpan span) {
 return userRawDataStream.map(userRaw->new 
UserActionValue(userRaw)).startNewChain().uid("11")
  
.keyBy("userRaw.eventRaw.env_key","userRaw.eventRaw.tag","userRaw.eventRaw."+span.name(),"userRaw.type")
 .window(GlobalWindows.create())
 .trigger(new TimedTrigger(duration))
 .sum("count")//.startNewChain().uid("") .map(userActionsCount->new 
UserAction(userActionsCount,span));//.startNewChain().uid(""); }
I made sure that they are unique and even with one uid, the error is there. I 
guess there is an easy fix, but I cannot see it.
Regards,
-Rami
Disclaimer: This message and any attachments thereto are intended 
solely for the addressed recipient(s) and may contain confidential 
information. If you are not the intended recipient, please notify the 
sender by reply e-mail and delete the e-mail (including any 
attachments thereto) without producing, distributing or retaining any 
copies thereof. Any review, dissemination or other use of, or taking 
of any action in reliance upon, this information by persons or 
entities other than the intended recipient(s) is prohibited. Thank you. 





Re: Flink Graphire Reporter stops reporting via TCP if network issue

2017-05-05 Thread Chesnay Schepler

Hello,

for Graphite, Flink uses the DropWizard metrics reporter. I don't know 
at the moment whether it supports any kind of reconnecting functionality.


I'm not sure whether i understood you correctly; did you try upgrading 
the DropWizard metrics-core/metrics-graphite dependencies?


If that didn't do the trick we could in fact implement this in Flink, it 
would be hack though. When an error occurs we can simply re-instantiate 
the reporter, but we would have to know how the reporter communicates 
the connection drop; i.e. whether it throws some exception or not.


Could you check the log for a warning statements from the MetricRegistry?

Regards,
Chesnay

On 05.05.2017 13:26, Bruno Aranda wrote:

Hi,

We are using the Graphite reporter from Flink 1.2.0 to send the 
metrics via TCP. Due to our network configuration we cannot use UDP at 
the moment.


We have observed that if there is any problem with graphite our the 
network, basically, the TCP connection times out or something, the 
metrics reporter does not recover. This is easy to reproduce by 
blocking the port we are sending the metrics using iptables. If we 
block the port for more than a minute or so, the problem will happen. 
After the port is re-open, Flink does not continue like before.


Is this a known issue? Googling shows some problems with the 
metrics-graphite package that should have been solved already. We have 
trying updated metrics-core/graphite to the latest with no success.


Any ideas?

Thanks!

Bruno





Re: Collector.collect

2017-05-02 Thread Chesnay Schepler

In the Batch API only a single operator can be chained to another operator.

So we're starting with this code:

   input = ...
   input.filter(conditionA).output(formatA)
   input.filter(conditonB).output(formatB)

In the Batch API this would create a CHAIN(filterA -> formatA) and a 
CHAIN(filterB -> formatB), both having "input" as their input.
Since the filtering is not done as part of "input" the entire input 
DataSet must be sent to both tasks.
This means that both chains have to deserialize the entire DataSet to 
apply the filter; the serialization should only be done once though.


In contrast the solution you wrote creates a single CHAIN(input, 
format), with no serialization in between at all.


The Streaming API doesn't have this limitation and would get by without 
any serialization as well. Probably.


On 02.05.2017 15:23, Newport, Billy wrote:


Why doesn’t this work with batch though. We did

input = ...
input.filter(conditionA).output(formatA)
input.filter(conditonB).output(formatB)

And it was pretty slow compared with a custom outputformat with an 
integrated filter.


*From:*Chesnay Schepler [mailto:ches...@apache.org]
*Sent:* Monday, May 01, 2017 12:56 PM
*To:* Newport, Billy [Tech]; 'user@flink.apache.org'
*Subject:* Re: Collector.collect

Oh you have multiple different output formats, missed that.

For the Batch API you are i believe correct, using a custom 
output-format is the best solution.


In the Streaming API the code below should be equally fast, if the 
filtered sets don't overlap.


input = ...
input.filter(conditionA).output(formatA)
input.filter(conditonB).output(formatB)

That is because all filters would be chained; hell all sources might 
be as well (not to sure on this one).


On 01.05.2017 17:05, Newport, Billy wrote:

There is likely a bug then, the ENUM,Record stream to a filter to
a set of outputformats per filter was slower than the
BITMASK,Record to single OutputFormat which demux’s the data to
each file internally

Are you saying do a custom writer inside a map rather than either
of the 2 above approaches?

    *From:*Chesnay Schepler [mailto:ches...@apache.org]
*Sent:* Monday, May 01, 2017 10:41 AM
*To:* user@flink.apache.org <mailto:user@flink.apache.org>
*Subject:* Re: Collector.collect

Hello,

@Billy, what prevented you from duplicating/splitting the record,
based on the bitmask, in a map function before the sink?
This shouldn't incur any serialization overhead if the sink is
chained to the map. The emitted Tuple could also share the
GenericRecord; meaning you don't even have to copy it.

On 01.05.2017 14:52, Newport, Billy wrote:

We’ve done that but it’s very expensive from a serialization
point of view when writing the same record multiple times,
each in a different tuple.

For example, we started with this:

.collect(new Tuple<Short, GenericRecord)).

The record would be written with short = 0 and again with
short = 1. This results in the GenericRecord being serialized
twice. You also prolly need filters on the output dataset
which is expensive also.

We switched instead to a bitmask. Now, we write the record
once and set bits in the short for each file the record needs
to be written to. Our next step is to write records to a file
based on the short. We wrote a new outputrecordformat which
checks the bits in the short and writes the GenericRecord to
each file for the corresponding bit. This means no filter to
split the records for each file and this is much faster.

We’re finding a need to do this kind of optimization pretty
frequently with flink.

*From:*Gaurav Khandelwal [mailto:gaurav671...@gmail.com]
*Sent:* Saturday, April 29, 2017 4:32 AM
*To:* user@flink.apache.org <mailto:user@flink.apache.org>
*Subject:* Collector.collect

Hello

I am working on RichProcessFunction and I want to emit
multiple records at a time. To achieve this, I am currently
doing :

while(condition)

{

 Collector.collect(new Tuple<>...);

}

I was wondering, is this the correct way or there is any other
alternative.





Re: MapR libraries shading issue

2017-06-28 Thread Chesnay Schepler

I would say that this is a MapR issue.

It's a good idea to add it to the docs in case someone else stumbles 
upon this.

Would be great if you could open a JIRA for that.

On 27.06.2017 19:35, ani.desh1512 wrote:

Again as I mentioned in the MapR thread,

So, after some more digging, I found out that you can make flink use the
default java truststore by passing
-Djavax.net.ssl.trustStore=$JAVA_HOME/jre/lib/security/cacerts as JVM_ARGS
for Flink.
I tested this approach with AWS, datadog along with MapR Streams and Tables
and it seems to have worked as of now.

I am not sure if this is the right approach, but if it indeed is then we
should include it in the Flink Mapr documentation.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/MapR-libraries-shading-issue-tp13988p14027.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.





Re: Custom Serializers

2017-09-18 Thread Chesnay Schepler
If Parameters are always encapsulated in an Event, and the Event 
serializer knows how to deal with them, then you

only need to implement a serializer etc. for the Event class.

On 18.09.2017 13:20, nragon wrote:

Sorry for bringing this up, any tips on this?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Custom Serializers

2017-09-18 Thread Chesnay Schepler

you do need them, but only for the Event class.

On 18.09.2017 13:38, nragon wrote:

So, no need for typeinfo, comparator or factory?

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/types_serialization.html#defining-type-information-using-a-factory



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Load distribution through the cluster

2017-09-20 Thread Chesnay Schepler

It should only apply to the map operator.

On 19.09.2017 17:38, AndreaKinn wrote:

If I apply a sharing slot as in the example:

DataStream LTzAccStream = env
.addSource(new FlinkKafkaConsumer010<>("topic", 
new
CustomDeserializer(), properties))
.assignTimestampsAndWatermarks(new 
CustomTimestampExtractor())
.map(new MapFunction, 
Event>(){
   @Override
public Event map(Tuple2 
value) throws Exception {
return new Event(value.f0, 
value.f1);
}
}).slotSharingGroup("group1");

just the map operator is assigned to the shared slot or it happens for the
entire block (addSource + assignTimestamp + map)?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: NoResourceAvailable exception

2017-09-14 Thread Chesnay Schepler

The error message says that the total number of slots is 0,
It is thus very likely that no task manager is connected to the jobmanager.

How exactly are you starting the cluster?

On 14.09.2017 18:03, AndreaKinn wrote:

Hi,
I'm executing a program on a flink cluster.
I tried the same on a local node with Eclipse and it worked fine.

To start, following Flink recommendations on the cluster I set
numberOfTaskSlots equals to the Cpu cores (2) while I set parallelism to 1.
Unfortunately when I try to execute I obtain


Caused by:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Not enough free slots available to run the job. You can decrease the
operator parallelism or increase the number of slots per TaskManager in the
configuration. Task to schedule: < Attempt #1 (Source: Custom Source ->
Timestamps/Watermarks (1/1)) @ (unassigned) - [SCHEDULED] > with groupID <
cbc357ccb763df2852fee8c4fc7d55f2 > in sharing group < SlotSharingGroup
[e883208d19e3c34f8aaf2a3168a63337, 9dd63673dd41ea021b896d5203f3ba7c,
cbc357ccb763df2852fee8c4fc7d55f2] >. Resources available to scheduler:
Number of instances=0, total number of slots=0, available slots=0


As you can see it says I have 0 available slots... how is this possible?!?
I set no chains or sharingGroups in the code.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Get EOF from PrometheusReporter in JM

2017-09-22 Thread Chesnay Schepler

The Prometheus reporter should work with 1.3.2.

Does this also occur with the reporter that currently exists in 1.4? (to 
rule out new bugs from the PR).


To investigate this further, please set the logging level to WARN and 
try again, as all errors in the metric system are logged on that level.


On 22.09.2017 10:33, Tony Wei wrote:

Hi,

I have built the Prometheus reporter package from this PR 
https://github.com/apache/flink/pull/4586, and used it on Flink 1.3.2 
to record every default metrics and those from `FlinkKafkaConsumer`.


Originally, everything was fine. I could get those metrics in TM from 
Prometheus just like I saw on Flink Web UI.
However, when I turned to JM, I found Prometheus gives this error to 
me: Get http://localhost:9249/metrics: EOF.
I checked the log on JM and saw nothing in it. There was no error 
message and 9249 port was still alive.


To figure out what happened, I created another cluster and I found 
Prometheus could connect to Flink cluster if there is no running job. 
After JM triggered or completed the first checkpoint, Prometheus 
started getting ERR_EMPTY_RESPONSE from JM, but not for TM. There was 
still no error in log file and 9249 port was still alive.


I was wondering where did the error occur. Flink or Prometheus reporter?
Or It is incorrect to use Prometheus reporter on Flink 1.3.2 ? Thank you.

Best Regards,
Tony Wei





Re: Custom Serializers

2017-09-19 Thread Chesnay Schepler
Have a look at the TupleTypeInfo class. It has a constructor that 
accepts an array of TypeInformation,

and supports automatically generating a serializer from them.

On 18.09.2017 18:28, nragon wrote:

One other thing :). Can i set tuple generic type dynamically?
Meaning, build a tuple of N arity and build TupleSerializer based on those
types.
This because I'll only know these types based on user inputs.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Can't send data to another service in addSink

2017-09-18 Thread Chesnay Schepler
Please read the Basic API concepts guide in the documentation, in 
particular 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#lazy-evaluation.


The short answer is that main() is called on the client, while the sink 
is executed on a taskmanager, i.e. in a different JVM. The sink must be 
thus self-contained, i.e. it must call connectToMqttServer().


On 18.09.2017 06:00, Duy Truong wrote:

Hi

In my flink program, I want to send data to a server via MQTT 
protocol, here is my code||


https://gist.github.com/duytruong/d240958d2c4140b554b604cbef05edc7 



The problem is it ran well on IntelliJ, but when submitted to cluster 
(bin/start-local.sh), it could connect but failed to send data. When 
I've tried to move connectToMqttServer() to addSink(), it worked (on 
cluster), so I have 2 questions:


1. Why it could connect but failed to send data, I guess the cause is 
addSink() is executed in different thread from the 
connectToMqttServer's thread but I can't find any document about it.


2. Why it ran well in IntelliJ but failed on cluster (I didn't move 
connectToMqttServer() to addSink() in this case).


Thanks,

--
/Duy Truong/





Re: Weird error in submitting a flink job to yarn cluster

2017-10-04 Thread Chesnay Schepler

This isn't related to FLink but i might be able to help you out anyway.

Does the ParquestFileWriter set the 'overwrite' flag when calling 
'FileSystem#create()'?


My suspicion is that you create a file for the first batch, write it 
out, but not delete it.
For the next batch, the file cannot be created (since it still exists) 
and thus fails.


Since the application now crashes the /tmp directory probably gets 
cleaned up, which is why you don't see

any leftover file.

To verify this theory you can add a simple counter to your sink for the 
number of created files. It should succeed
for the first batch and fail on the second one. In this case you should 
make sure that the file is deleted after the first

batch has been written.

On 03.10.2017 08:01, vipul singh wrote:

Hello,

I am working on a ParquetSink writer, which will convert a kafka 
stream to parquet format. I am having some weird issues in deploying 
this application to a yarn cluster. I am not 100% sure this falls into 
a flink related error, but I wanted to reach out to folks here incase 
it might be.



If I launch Flink within YARN only for executing a single job, it runs 
ok. This is the command I use for the deployment:


*Command:* /flink run--jobmanager yarn-cluster -ytm 4096 -yjm 1048 -ys 
2 -yn 2 -d -c  jar_name.jar///


However as soon as I try to submit a similar job to a already running 
yarn cluster, I start to get these 
errors(_https://gist.github.com/neoeahit/f0130e9f447ea9c2baa38bf5ee4e6a57_) 
and application crashes. I checked the location in /tmp, where I am 
creating the file, and there is no file existing there.


*Command:* /flink run -yid application_id -d -c 
 jar_name.jar /



A bit more about my algorithm, I use a temp array to buffer messages 
in the @invoke method, and when specific threshold are reached I 
create a parquet file with this buffered data. Once a tmp parquet file 
is created, I upload this file to long term storage.


The code to write buffered data to a parquet file is:

  writer =Some(AvroParquetWriter.builder(getPendingFilePath(tmp_filename.get))
.withSchema(schema.get)
.withCompressionCodec(compressionCodecName)
.withRowGroupSize(blockSize)
.withPageSize(pageSize)
.build())
bufferedMessages.foreach { e =>
   writer.get.write(e.payload)
}
writer.get.close()

Please do let me know.

Thanking in advance,
- Vipul






Re: Fw: Question on Flink on Window

2017-10-04 Thread Chesnay Schepler
You're attempting to start flink from the wrong directory, specifically 
from within the source directory.


If you download Flinks source release from the downloads page 
 you have to build it manually, 
in which case the 'start-local.bat' to run

will be located under 'flink\flink-dist\target\flink-1.3.2\flink-1.3.2\bin'.

You can also download a pre-built release from the same page (under 
'Binaries'). After extracting them you will find the 'bin' directory.


On 03.10.2017 12:19, Tay Zhen Shen wrote:






*From:* Tay Zhen Shen
*Sent:* Tuesday, 3 October, 2017 1:03 PM
*To:* user@flink.apache.org
*Subject:* Question on Flink on Window

Hi,


I'm currently trying to setup Flink 1.3.2 on my Windows 10. When i was 
running the start-local.bat file, the command prompt shows that i have 
missing files. I tracked everything and i found that in the 
start-local.bat, there are configurations which points to bin/../lib. 
I search through the file downloaded from the site and i couldn't 
find any lib file. How can i solve this?



Thank you.




Regards,

Tay Zhen Shen

Student of USM (Malaysia)





Re: Classloader error after SSL setup

2017-10-04 Thread Chesnay Schepler
something that would also help us narrow down the problematic area is to 
enable SSL for one component at a time and see

which one causesd the job to fail.

On 04.10.2017 14:11, Chesnay Schepler wrote:
The configuration looks reasonable. Just to be sure, are the paths 
accessible by all nodes?


As a first step, could you set the logging level to DEBUG (by 
modifying the 'conf/log4j.properties' file), resubmit the job (after a 
cluster restart) and check the Job- and TaskManager logs for any 
exception?


On 04.10.2017 03:15, Aniket Deshpande wrote:
Background: We have a setup of Flink 1.3.1 along with a secure MAPR 
cluster (Flink is running on mapr client nodes). We run this flink 
cluster via flink-jobmanager.sh <http://flink-jobmanager.sh> 
foreground and flink-taskmanager.sh <http://flink-taskmanager.sh> 
foreground command via Marathon.  In order for us to make this work, 
we had to add -Djavax.net 
<http://-Djavax.net>.ssl.trustStore="$JAVA_HOME/jre/lib/security/cacerts" in 
flink-console.sh <http://flink-console.sh> as extra JVM arg 
(otherwise, flink was taking MAPR's ssl_truststore as default 
truststore and then we were facing issues for any 3rd party jars like 
aws_sdk etc.). This entire setup was working fine as it is and we 
could submit our jars and the pipelines ran without any problem



Problem: We started experimenting with enabling ssl for all 
communication for Flink. For this, we followed 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/security-ssl.html for 
generating CA and keystore. I added the following properties to 
flink-conf.yaml:



security.ssl.enabled: true
security.ssl.keystore: /opt/flink/certs/node1.keystore
security.ssl.keystore-password: 
security.ssl.key-password: 
security.ssl.truststore: /opt/flink/certs/ca.truststore
security.ssl.truststore-password: 
jobmanager.web.ssl.enabled: true
taskmanager.data.ssl.enabled: true
blob.service.ssl.enabled: true
akka.ssl.enabled: true


We then spin up a cluster and tried submitting the same job which was 
working before. We get the following erros:
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
load user class: 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09

ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:229) 

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:95) 

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:230) 


at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)


This error disappears when we remove the ssl config properties i.e 
run flink cluster without ssl enabled.



So, did we miss any steps for enabling ssl?


P.S.: We tried removing the extra JVm arg mentioned above, but still 
get the same error.


--

Aniket







Re: Classloader error after SSL setup

2017-10-04 Thread Chesnay Schepler
The configuration looks reasonable. Just to be sure, are the paths 
accessible by all nodes?


As a first step, could you set the logging level to DEBUG (by modifying 
the 'conf/log4j.properties' file), resubmit the job (after a cluster 
restart) and check the Job- and TaskManager logs for any exception?


On 04.10.2017 03:15, Aniket Deshpande wrote:
Background: We have a setup of Flink 1.3.1 along with a secure MAPR 
cluster (Flink is running on mapr client nodes). We run this flink 
cluster via flink-jobmanager.sh  
foreground and flink-taskmanager.sh  
foreground command via Marathon.  In order for us to make this work, 
we had to add -Djavax.net 
.ssl.trustStore="$JAVA_HOME/jre/lib/security/cacerts" in 
flink-console.sh  as extra JVM arg 
(otherwise, flink was taking MAPR's ssl_truststore as default 
truststore and then we were facing issues for any 3rd party jars like 
aws_sdk etc.). This entire setup was working fine as it is and we 
could submit our jars and the pipelines ran without any problem



Problem: We started experimenting with enabling ssl for all 
communication for Flink. For this, we followed 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/security-ssl.html for 
generating CA and keystore. I added the following properties to 
flink-conf.yaml:



security.ssl.enabled: true
security.ssl.keystore: /opt/flink/certs/node1.keystore
security.ssl.keystore-password: 
security.ssl.key-password: 
security.ssl.truststore: /opt/flink/certs/ca.truststore
security.ssl.truststore-password: 
jobmanager.web.ssl.enabled: true
taskmanager.data.ssl.enabled: true
blob.service.ssl.enabled: true
akka.ssl.enabled: true


We then spin up a cluster and tried submitting the same job which was 
working before. We get the following erros:
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
load user class: 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09

ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:229) 

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:95) 

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:230) 


at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)


This error disappears when we remove the ssl config properties i.e run 
flink cluster without ssl enabled.



So, did we miss any steps for enabling ssl?


P.S.: We tried removing the extra JVm arg mentioned above, but still 
get the same error.


--

Aniket





Re: javax.net.ssl.SSLPeerUnverifiedException: peer not authenticated for S3 access

2017-10-04 Thread Chesnay Schepler
I've found a few threads where an outdated jdk version on the 
server/client may be the cause.


Which Flink binary (specifically, for which hadoop version) are you using?

On 03.10.2017 20:48, Hao Sun wrote:

com.amazonaws.http.AmazonHttpClient   - Unable to 
execute HTTP request: peer not authenticated
javax.net.ssl.SSLPeerUnverifiedException: peer not authenticated
at 
sun.security.ssl.SSLSessionImpl.getPeerCertificates(SSLSessionImpl.java:431)





Re: Writing an Integration test for flink-metrics

2017-10-13 Thread Chesnay Schepler
I meant that you could unit-test the behavior of the function in 
isolation. You could create a dummy metric group that
verifies that the correct counters are being registered (based on names 
i guess), as well as provide access to them.

Mock some input and observe whether the counter value is being modified.

Whether this is a viable option depends a bit on the complexity of the 
function of course, that is how much how mocking

you would have to do.

On 13.10.2017 11:18, Piotr Nowojski wrote:
For testing Link applications in general you can read 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html


However as we said before, testing metrics would require using custom 
or a imx reporter.


Yes, please report this bug in Jira.

Thanks, Piotrek

On 13 Oct 2017, at 04:31, Colin Williams 
<colin.williams.seat...@gmail.com 
<mailto:colin.williams.seat...@gmail.com>> wrote:


Team wants an integration test, I'm not sure what unit test you had 
in mind. Actually feel that I've been trying to avoid the reporter 
method but that would be more end to end.


The documentation for metrics and Scala are missing with the 
exception of Gauge: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html 
. Should I file a issue against that?


Then it leaves you guessing a little bit how to implement Counters. 
One approach tried was using objects


object PointFilterextends RichMapFunction[...
   @transient lazy val someCounter = 
getRuntimeContext.getMetricGroup.counter(...)

This allowed access to the counter before and after execution . 
However between the unit tests the Counter kept its value also and 
that's a no for the test. Think that might be an issue with ScalaTest.


I've tried to get at the counter from some other directions like 
trying to find a way to inject a reporter to get it's state. But 
don't see a way to do it. So probably the best thing to do is fire up 
something to collect the metrics from the reporter.


On Thu, Oct 12, 2017 at 5:29 AM, Chesnay Schepler <ches...@apache.org 
<mailto:ches...@apache.org>> wrote:


Well damn, i should've read the second part of the initial mail.

I'm wondering though, could you not unit-test this behavior?


    On 12.10.2017 14:25, Chesnay Schepler wrote:

You could also write a custom reporter that opens a socket or
similar for communication purposes.

You can then either query it for the metrics, or even just
trigger the verification in the reporter,
and fail with an error if the reporter returns an error.

On 12.10.2017 14:02, Piotr Nowojski wrote:

Hi,

Doing as you proposed using JMXReporter (or custom
reporter) should work. I think there is no easier way to
do this at the moment.

Piotrek

On 12 Oct 2017, at 04:58, Colin Williams
<colin.williams.seat...@gmail.com
<mailto:colin.williams.seat...@gmail.com>> wrote:

I have a RichMapFunction and I'd like to ensure Meter
fields are properly incremented. I've been trying to
think of the best way to do this. Currently I think
that I'd need to either implement my own reporter (or
use JMX) and write to a socket, create a listener and
wait for the reporter to send the message.

Is this a good approach for writing the test, or
should I be considering something else?












Re: RichMapFunction parameters in the Streaming API

2017-10-11 Thread Chesnay Schepler
The Configuration parameter in open() is a relic of the previous java 
API where operators were instantiated generically.


Nowadays, this is no longer the case as they are serialized instead, 
which simplifies the passing of parameters as you can

simply store them in a field of your UDF.

The configuration object passed to open() in case of the streaming API 
is always empty, and we don't plan

to implement it since it provides little value due to the above.

As such, we suggest to pass either the parameter tool, configuration 
instance or specific parameters through the constructor of user-defined 
functions and store them in a field. This applies both to the batch and 
streaming API.


Personally i would stay away from the global configuration option as it 
is more brittle than the constructor approach, which makes

it explicit that this function requires these parameters.

On 11.10.2017 00:36, Colin Williams wrote:
I was looking for withParameters(config) in the Streaming API today. I 
stumbled across the following thread.


http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/withParameters-for-Streaming-API-td9332.html#a9333 



It appears that some of the StreamingAPI developers are in favor of 
removing the parameters from RichMapFunctions' open. However the best 
practices article


https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_practices.html#using-the-parameters-in-your-flink-program

Show examples of using both global configuration (where parameters are 
available from open) and withParameters(config) (which doesn't work 
from the Streaming API)


I'm trying to make a decision regarding using global parameters with 
my Flink Streaming jobs.


Is using the global configuration a good idea for parameters in the 
Streaming API or is this best practice just suggested for the Batch API?


Is there a reason for the opinion of removing the configuration 
parameters from open?








Re: Writing an Integration test for flink-metrics

2017-10-12 Thread Chesnay Schepler
You could also write a custom reporter that opens a socket or similar 
for communication purposes.


You can then either query it for the metrics, or even just trigger the 
verification in the reporter,

and fail with an error if the reporter returns an error.

On 12.10.2017 14:02, Piotr Nowojski wrote:

Hi,

Doing as you proposed using JMXReporter (or custom reporter) should work. I 
think there is no easier way to do this at the moment.

Piotrek


On 12 Oct 2017, at 04:58, Colin Williams  
wrote:

I have a RichMapFunction and I'd like to ensure Meter fields are properly 
incremented. I've been trying to think of the best way to do this. Currently I 
think that I'd need to either implement my own reporter (or use JMX) and write 
to a socket, create a listener and wait for the reporter to send the message.

Is this a good approach for writing the test, or should I be considering 
something else?






Re: Writing an Integration test for flink-metrics

2017-10-12 Thread Chesnay Schepler

Well damn, i should've read the second part of the initial mail.

I'm wondering though, could you not unit-test this behavior?

On 12.10.2017 14:25, Chesnay Schepler wrote:
You could also write a custom reporter that opens a socket or similar 
for communication purposes.


You can then either query it for the metrics, or even just trigger the 
verification in the reporter,

and fail with an error if the reporter returns an error.

On 12.10.2017 14:02, Piotr Nowojski wrote:

Hi,

Doing as you proposed using JMXReporter (or custom reporter) should 
work. I think there is no easier way to do this at the moment.


Piotrek

On 12 Oct 2017, at 04:58, Colin Williams 
<colin.williams.seat...@gmail.com> wrote:


I have a RichMapFunction and I'd like to ensure Meter fields are 
properly incremented. I've been trying to think of the best way to 
do this. Currently I think that I'd need to either implement my own 
reporter (or use JMX) and write to a socket, create a listener and 
wait for the reporter to send the message.


Is this a good approach for writing the test, or should I be 
considering something else?









Re: metrics for Flink sinks

2017-08-29 Thread Chesnay Schepler

Hello,

1. Because no one found time to fix it. In contrast to the remaining 
byte/record metrics, input metrics for sources / output metrics for 
sinks have to be implemented for every single implementation with their 
respective semantics. In contrast, the output metrics are gathered in 
the intersection between operators, independent of the actual operator 
implementation. Furthermore, this requires system metrics (i.e. metrics 
that Flink itself creates) to be exposed (and be mutable!) to 
user-defined functions, which is something i /generally /wanted to 
avoid, but it appears to be a big enough pain point to make an exception 
here.


2. Due to the above it is currently not possible without modifications 
of the code to know how many reads/writes were made.


3. Do you mean aggregated metrics? The web UI allows the aggregation of 
record/byte metrics on the task level. Beyond that we defer aggregation 
to actual time-series databases that specialize in these things.


On 28.08.2017 19:08, Martin Eden wrote:

Hi all,

Just 3 quick questions both related to Flink metrics, especially 
around sinks:


1. In the Flink UI Sources always have 0 input records / bytes and 
Sinks always have 0 output records / bytes? Why is it like that?


2. What is the best practice for instrumenting off the shelf Flink sinks?

Currently the only metrics available are num records/bytes in and out 
at the operator and task scope. For the task scope there are extra 
buffer metrics. However the output metrics are always zero (see 
question 1). How can one know the actual number of successful writes 
done by an off the shelf Flink sink? Or the latency of the write 
operation?


3. Is it possible to configure Flink to get global job metrics for all 
subtasks of an operator? Or are there any best practices around that?


Thanks,
M





Re: Update timeWindow size and trigger value at runtime

2017-09-11 Thread Chesnay Schepler

You cannot change the size/trigger count while a job is running.

For this to work you will have to take a savepoint, modify the 
parameters and reload from the savepoint.


On 11.09.2017 09:27, victor.reut wrote:

Hi,

I want to have an opportunity to update timeWindow size and trigger value in
KeyedStream dynamically at runtime. For example, I have such a piece of
code:

 DataStream stream = env.addSource(new
FlinkKafkaConsumer09<>(TOPIC, new JSONDeserializer(), properties));

 Integer numMinutes = ...
 Integer triggersCount = ...

 stream.keyBy("key")
 .timeWindow(Time.minutes(numMinutes))
 .trigger(CountTrigger.of(triggersCount))
 .reduce(new MetricsReduceFunction() , new
MetricsTimeWindowReduceFunction()).print();

If I just change the values of variables numMinutes and triggersCount, Flink
does not update them. Also I haven't found a good solution in google.
Does anybody know how to solve this issue?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Is State access synchronized?

2017-09-11 Thread Chesnay Schepler

Hello,

state is local to each parallel instance of an operator. Coupled with 
the fact that the "map" method is always called by the same thread (and 
never concurrently) the ValueState (or any state for that matter) will 
always return the latest values.


On 10.09.2017 14:39, Federico D'Ambrosio wrote:

Hi,

as per the mail subject I wanted to ask you if a State access (read 
and write) is synchronized.


I have the following stream:

val airtrafficEvents = stream
.keyBy(_.flightInfo.flight)
.map(new UpdateIdFunction())


where UpdateIdFunction is a RichMapFunction with a ValueState and a 
MapState, with the following map method


def map(value: AirTrafficEvent): AirTrafficEventWithId = {

  val flight = value.flightInfo.flight
  val time = value.instantValues.time

  AirTrafficEventWithId(value, createOrGetId(flight, time.getMillis))

}

private def createOrGetId(_key: String, _time: Long): Int = {

  val tmpId = valuestate.value

  //Remove from MapState entries older than one minute

  val entry = Option[(Int, Long)](lookupMap.get(_key))

  //update ValueState or MapState if needed

  //return current updated ValueState or corresponding ID from updated 
MapState


}

So, I'm using the MapState to track the integer IDs of the events of 
the stream,retaining only the latest records inside the MapState, and 
I'm using the ValueState to generate an incremental integer ID for 
said events.
Given all of this, I'm really not sure how the mapping is applied to 
the keyedstream in input: is it guaranteed that each time the method 
is called I'm getting the latest and updated value/map?


Thank you for your attention,
Federico





Re: Delay in Flink timers

2017-09-11 Thread Chesnay Schepler
It is true that onTimer and processElement are never called at the same 
time.


I'm not entirely sure whether there is any prioritization/fairness 
between these methods
(if not if could be that onTimer is starved) , looping in Aljoscha who 
hopefully knows more

about this.

On 10.09.2017 09:31, Narendra Joshi wrote:

Hi,

We are using Flink as a timer scheduler and delay in timer execution is
a huge problem for us. What we have experienced is that as the number of
Timers we register increases the timers start getting delayed (for more
than 5 seconds). Can anyone point us in the right direction to figure
out what might be happening?

I have been told that `onTimer` and `processElement` are called with a
mutually exclusive lock. Could this locking be the reason this is
happening? In both the functions there is no IO happening and it should
not take 5 seconds.

Is it possible that calls to `processElement` starve `onTimer` calls?


--
Narendra Joshi





Re: Best way to deriving streams from another one

2017-09-11 Thread Chesnay Schepler
Have a look at side outputs in the documentation, they allow you to emit 
to multiple streams (of different types!) with a ProcessFunction.


On 10.09.2017 22:15, AndreaKinn wrote:

Hi,
I have a data stream resulting from an operation executed on a data stream
of data.
Essentially I want to obtain two different streams from that one to send
their to different cassandra tables.

I.e.:

datastream 0 composed by Tuple3

I want to have:

  a datastream 1 composed by every triple  of datastream 0
where Val2 > X
and
a data stream 2 composed by every couple .

This lied me to have two datastreams with Tuples of different arity (3 and
2).

Currently I have implemented it getting the 0 datastream and then calling
separately a map function to retrieve datastream 2 and a flatmap function to
retrieve datastream 1. So I have two different prepared statement of
Cassandra called on the two different streams.
It works fine.

However this solutions looks really awful and inefficient, there is a more
elegant alternative?

I tried also to send towards Cassandra a datastream and
select in the statement just two values (in this way I should use just the
flatmap operator) but during the execution raise an exception on it.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: EASY Friday afternoon question: order of chained sink operator execution in a streaming task

2017-09-29 Thread Chesnay Schepler

Yes, i believe that is correct.

On 29.09.2017 14:01, Martin Eden wrote:

Hi all,

Just a quick one.

I have a task that looks like this (as printed in the logs):

17-09-29 0703510695 INFO TaskManager.info: Received task Co-Flat Map 
-> Process -> (Sink: sink1, Sink: sink2, Sink: sink3) (2/2)


After looking a bit at the code of the streaming task I suppose the 
sink operators are chained for each subtask and synchronously executed 
one after the other in the order I specified them in code (which does 
correspond to the order in the log message).


A particular subtask does something like this on one thread (just 
focusing on sinks):


time record invocations
0  record1   sink1.invoke(record1)
1 sink2.invoke(record1)
2 sink3.invoke(record1)
3  record2   sink1.invoke(record2)
4 sink2.invoke(record2)
5 sink3.invoke(record2)
.
.
.

Is that correct?

Thanks





Re: how many 'run -c' commands to start?

2017-09-30 Thread Chesnay Schepler
The order in which you pass program arguments is quite important. Can 
you try again putting --detached directly after run?

I.e. flink run --detached -c ...

The reason being that arguments after the jar are treated as arguments 
for the user-program, and not as arguments for the job execution.


On 29.09.2017 17:23, r. r. wrote:

Sure, here is the cmdline output:

flink-1.3.2/bin/flink run -c com.corp.flink.KafkaJob quickstart.jar --topic 
InputQ --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 
--group.id Consumers -p 5 --detached
Cluster configuration: Standalone cluster with JobManager at 
localhost/127.0.0.1:6123
Using address localhost:6123 to connect to JobManager.
JobManager web interface address http://localhost:8081
Starting execution of program
Submitting job with JobID: 5cc74547361cec2ff9874764dac9ee91. Waiting for job 
completion.
Connected to JobManager at 
Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-100905913] with leader 
session id ----.
09/28/2017 16:30:40Job execution switched to status RUNNING.
09/28/2017 16:30:40Source: Custom Source -> Map(1/5) switched to SCHEDULED
09/28/2017 16:30:40Source: Custom Source -> Map(2/5) switched to SCHEDULED
09/28/2017 16:30:40Source: Custom Source -> Map(3/5) switched to SCHEDULED
09/28/2017 16:30:40Source: Custom Source -> Map(4/5) switched to SCHEDULED
09/28/2017 16:30:40Source: Custom Source -> Map(5/5) switched to SCHEDULED
09/28/2017 16:30:40Sink: Unnamed(1/5) switched to SCHEDULED
09/28/2017 16:30:40Sink: Unnamed(2/5) switched to SCHEDULED
09/28/2017 16:30:40Sink: Unnamed(3/5) switched to SCHEDULED
09/28/2017 16:30:40Sink: Unnamed(4/5) switched to SCHEDULED
09/28/2017 16:30:40Sink: Unnamed(5/5) switched to SCHEDULED
...

I thought --detach will put the process in the background, and give me back the 
cmdline, but maybe I got the meaning behind this option wrong?

Thank you!







  > Оригинално писмо ----

  >От: Chesnay Schepler ches...@apache.org

  >Относно: Re: how many 'run -c' commands to start?

  >До: user@flink.apache.org

  >Изпратено на: 29.09.2017 18:01



  
  

  
  
  
   
  
   
  

The only nodes that matter are those on which the Flink processes,  i.e
  

Job- and TaskManagers, are being run.
  
 
  

 To prevent a JobManager node failure from causing the job to fail you have 
to look into an
  

HA setup.
  

 (The jobmanager is responsible for distributing/coordinating work)
  
 
  

 In case of a TaskManager node failure the job will fail and
  

restart, provided that enough TaskManagers are
  

 still alive to satisfy the resource requirements of the job.
  
 
  
 
  

 Can you elaborate a bit more what happened when you used the  --detached 
param?
  
 
  

 On 28.09.2017 16:33, r. r. wrote:
  
 
  

  

  

Thank you, Chesnay
  

to make sure - should the node where the job has been submitted goes down, the 
processing will continue, I hope?
  

Do I need to ensure this by configuration?
  
  

btw I added --detached param to the run cmd, but it didn't go into background 
process as I would've expected. Am I guessing wrong?
  
  

Thanks!
  

Rob
  
  

  > Оригинално писмо ----
  
  

  >От: Chesnay Schepler ches...@apache.org
  
  

  >Относно: Re: how many 'run -c' commands to start?
  
  

  >До: user@flink.apache.org
  
  

  >Изпратено на: 28.09.2017 15:05
  
  
  
  
  
  
 
  

 Hi!
  
  
  
 
  
 
  
  
  
 
  
  
  
 
  
 
  
  
  
 
  

 Given a Flink cluster, you would only call `flink run ...` to submit a
  
  
  
 
  
 
  
  
  
 
  

 job once; for simplicity i would submit it on the node where you started
  
  
  
 
  
 
  
  
  
 
  

 the cluster. Flink will automatically distribute job across the cluster,
  
  
  
 
  
 
  
  
  
 
  

 in smaller independent parts known as Tasks.
  
  
  
 
  
 
  
  
  
 
  
  
  
 
  
 
  
  
  
 
  

 Regards,
  
  
  
 
  
 
  
  
  
 
  

 Chesnay
  
  
  
 
  
 
  
  
  
 
  
  
  
 
  
 
  
  
  
 
  

 On 28.09.2017 08:31, r. r. wrote:
  
  
  
 
  
 
  
  
  
 
  
  
  

  Hello
  
  
  
  
  
 
  
 
  
  
  
 
  
  
  
   
  
  
  
 
  
 
  
  
  
 
  
  
  

  I successfully ran a job with 'flink run -c', but this is for the local
  
  
  
  
  
 
  
 
  
  
  
 
  
  
  
   
  
  
  
 
  
 
  
  
  
 
  
  
  

  setup.
  
  
  
  
  
 
  
 
  
  
  
 
  
  
  
   
  
  
  
 
  
 
  
  
  
 
  
  
  

  How should i proceed with a 

Re: how many 'run -c' commands to start?

2017-09-29 Thread Chesnay Schepler
The only nodes that matter are those on which the Flink processes,  i.e 
Job- and TaskManagers 
<https://ci.apache.org/projects/flink/flink-docs-release-1.4/concepts/runtime.html#job-managers-task-managers-clients>, 
are being run.


To prevent a JobManager node failure from causing the job to fail you 
have to look into an HA setup 
<https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/jobmanager_high_availability.html>.

(The jobmanager is responsible for distributing/coordinating work)

In case of a TaskManager node failure the job will fail and restart 
<https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/restart_strategies.html>, 
provided that enough TaskManagers are

still alive to satisfy the resource requirements of the job.


Can you elaborate a bit more what happened when you used the --detached 
param?


On 28.09.2017 16:33, r. r. wrote:

Thank you, Chesnay
to make sure - should the node where the job has been submitted goes down, the 
processing will continue, I hope?
Do I need to ensure this by configuration?

btw I added --detached param to the run cmd, but it didn't go into background 
process as I would've expected. Am I guessing wrong?

Thanks!
Rob






  > Оригинално писмо ----

  >От: Chesnay Schepler ches...@apache.org

  >Относно: Re: how many 'run -c' commands to start?

  >До: user@flink.apache.org

  >Изпратено на: 28.09.2017 15:05



  

Hi!
  
  

Given a Flink cluster, you would only call `flink run ...` to submit a
  

job once; for simplicity i would submit it on the node where you started
  

the cluster. Flink will automatically distribute job across the cluster,
  

in smaller independent parts known as Tasks.
  
  

Regards,
  

Chesnay
  
  

On 28.09.2017 08:31, r. r. wrote:
  

Hello
  
  

I successfully ran a job with 'flink run -c', but this is for the local
  
  

setup.
  
  

How should i proceed with a cluster? Will flink automagically instantiate
  
  

the job on all servers - i hope i don't have to start 'flink run -c' on all
  
  

machines.
  
  

New to flink and bigdata, so sorry for the probably silly question
  
  
  
  

Thanks!
  
  

Rob
  
  





Re: Custom Serializers

2017-09-28 Thread Chesnay Schepler

On 19.09.2017 11:39, nragon wrote:

createInstance(Object[] fields) at TupleSerializerBase seems not to be part
of TypeSerializer API.
Will I be loosing any functionality? In what cases do you use this instead
of createInstance()?

// We use this in the Aggregate and Distinct Operators to create instances
// of immutable Tuples (i.e. Scala Tuples)

Thanks

Taken from TupleSerializerBase:

// We use this in the Aggregate and Distinct Operators to create 
instances // of immutable Tuples (i.e. Scala Tuples) public abstract T 
createInstance(Object[] fields);


On 27.09.2017 17:43, nragon wrote:

Should I use TypeSerializerSingleton if it is independent of the object which
it's serializing?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Generally, use TypeSerializerSingleton. There is virtually no reason to 
not use it. Do keep this section of the TypeSerializer javadoc in mind:


* The methods in this class are assumed to be stateless, such that it is 
effectively thread safe. Stateful * implementations of the methods may 
lead to unpredictable side effects and will compromise both stability 
and * correctness of the program.




Re: how many 'run -c' commands to start?

2017-09-28 Thread Chesnay Schepler

Hi!

Given a Flink cluster, you would only call `flink run ...` to submit a 
job once; for simplicity i would submit it on the node where you started 
the cluster. Flink will automatically distribute job across the cluster, 
in smaller independent parts known as Tasks.


Regards,
Chesnay

On 28.09.2017 08:31, r. r. wrote:

Hello

I successfully ran a job with 'flink run -c', but this is for the local

setup.

How should i proceed with a cluster? Will flink automagically instantiate

the job on all servers - i hope i don't have to start 'flink run -c' on all

machines.

New to flink and bigdata, so sorry for the probably silly question



Thanks!

Rob






Re: Custom Serializers

2017-09-28 Thread Chesnay Schepler

On 19.09.2017 11:39, nragon wrote:

createInstance(Object[] fields) at TupleSerializerBase seems not to be part
of TypeSerializer API.
Will I be loosing any functionality? In what cases do you use this instead
of createInstance()?

// We use this in the Aggregate and Distinct Operators to create instances
// of immutable Tuples (i.e. Scala Tuples)

Thanks

Taken from TupleSerializerBase:

// We use this in the Aggregate and Distinct Operators to create 
instances // of immutable Tuples (i.e. Scala Tuples) public abstract T 
createInstance(Object[] fields);


On 27.09.2017 17:43, nragon wrote:

Should I use TypeSerializerSingleton if it is independent of the object which
it's serializing?



--
Sent from:http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Generally, use TypeSerializerSingleton. There is virtually no reason to 
not use it. Do keep this section of the TypeSerializer javadoc in mind:


* The methods in this class are assumed to be stateless, such that it is 
effectively thread safe. Stateful * implementations of the methods may 
lead to unpredictable side effects and will compromise both stability 
and * correctness of the program.




Re: Aggregating metrics from different boxes

2017-08-24 Thread Chesnay Schepler
If you want to change under what identifier metrics are exported please 
have a look at scope formats in the flink documentation: 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html#scope


If i understood you correctly the goal would be to remove the host and 
taskmanager parameters.


On 24.08.2017 10:45, Sridhar Chellappa wrote:

Folks,

I am using RichMapFunction to generate codahale like metrics from 
different taskmanagers spread across an N-Node cluster. When I see the 
visualizations (Grafana on InfluxDB), I see all of the metrics as 
separate streams ($host.$taskmanager.$uuid.$metricname).


I thought I can aggregate these metrics(counters from all boxes into 
one aggregate count) using Grafana but am Unable to do so. Is there 
way to change my code such that this is possible?






Re: datastream.print() doesn't works

2017-08-31 Thread Chesnay Schepler
If you call createLocalEnvironmentWithWebUI you don't need to start a 
cluster with the start-local.sh script,
you can run it from the IDE and it will start the web UI. If you submit 
a job to a cluster that was started outside the IDE

you can call getExecutionEnvironment as usual.

Not sure why the errors are different, but the message generally occurs 
when you call execute() on an environment
where you never defined any operators on. Please check that you're not 
creating multiple environments.


If possible it might also be helpful to provide us (or me personally) 
with the code of your program.


On 30.08.2017 19:08, AndreaKinn wrote:

Hi, in the night uninstalling and re-installing maven and flink I solved my
issue.

I started the web dashboard using start-local.sh script and used
/createLocalEnvironmentWithWebUI(new Configuration())/ as you suggested.
Anyway when I start it in eclipse in the ui dashboard no running jobs are
showed (as I expected).
To run it via console with the console command:

/./bin/flink run -c org.apache.flink.quickstart.StreamingJob
target/flink-java-project-0.1.jar/

but an error appears:

/No operators defined in streaming topology. Cannot execute.
/

Strangely, if I execute it in eclipse no errors appear. Do you know which
operators it refers??



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: datastream.print() doesn't works

2017-08-29 Thread Chesnay Schepler

The easiest explanation is that there is nothing to print.

Since print statements within the select function don't appear in the 
logs I assume

that the result of HTM.learn is empty.

Please check via the webUI or metrics whether any of these operations 
actually return records.


On 29.08.2017 13:19, AndreaKinn wrote:

Hi,
I have a simple datastream of a Tuple2. Unfortunately when I call the
print() method. No one output is showed although no errors or exceptions are
raised.
I want to highlight that I have also other data streams which are correctly
printed.

This is the stream:

/DataStream> result = HTM.learn(kafkaStream, new
Harness.AnomalyNetwork())
.select(new 
InferenceSelectFunction>() {
  @Override
   public Tuple2
select(Tuple2 inference) throws
Exception {
  return new Tuple2(3.1,
inference.f1.getAnomalyScore());
  }
});/

I'm quite sure the problem isn't in the particular function described above
because effectively in inner functions of getAnomalyScore for example the
return value exists.

Anyway I'm not able to find the problem which doesn't allow to print the
stream so I thought to write here to discover if there are typical situation
where streams are not printed. How can I verify and debug the problem? Some
hints about it ? I really tried everything, also tried to print something in
the function select() for example, with no results



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/datastream-print-doesn-t-works-tp15223.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.





Re: Atomic savepint and cancel

2017-08-29 Thread Chesnay Schepler

Hello,

savepoint is in general not an atomic operation, it only 
guarantees that no other checkpoint will be completed between the 
savepoint and the job cancellation.


You can only guarantee that no messages are sent out if you used a sink 
that supports exactly-once, which as far as i know, in the case of 
Kafka, is only possible with the upcoming 0.11 connector (PR). 



On 29.08.2017 14:11, Or Sher wrote:

Hi,

I'm a bit new to Flink and I'm trying to figure out what's the best 
way to make an upgrade for my current running topology without having 
duplicate messages being sent by the the sink. (One time prior the 
upgrade and one time after).


I thought that the "atomic" part of the savepoint & cancel suggested 
that i can just take a savepoint and cancel the job at the same time 
and later on start from that savepoint and that would be it.


Having tried that, it seems that I got many duplicated messages sent 
by the kafka producer sink again after the restore from savepoint.


Is that suppose to happen?
Did I misunderstood the "atomic" meaning?

Thanks,
Or.





Re: yarn and checkpointing

2017-08-29 Thread Chesnay Schepler

Checkpoints are only used for recovery during the job execution.

If the entire cluster is shutdown and restarted you will need to take a 
savepoint and restore from that.


On 29.08.2017 16:46, Gwenhael Pasquiers wrote:

Hi,

Is it possible to use checkpointing to restore the state of an app after a 
restart on yarn ?

 From what I've seen it looks like that checkpointing only works within a flink 
cluster life-time. However the yarn mode has one cluster per app, and (unless 
the app crashes and is automatically restarted by the restart-strategy) the 
over-yarn-cluster has the same life time as the app, so when we stop the app, 
we stop the cluster that will clean it's checkpoints.

So when the app is stopped, the cluster dies and cleans the checkpoints folder. 
Then of course it won't be able to restore the state at the next run.

When running flink on yarn are we supposed to cancel with savepoint and then 
restore from savepoint ?





Re: Set Savepoints configuration after cluster bootstrap

2017-08-29 Thread Chesnay Schepler

Hello,

it is not possible to permanently set the savepoint directory after the 
cluster has started, but the configured value can be overridden when 
taking a savepoint as described here 
.


On 29.08.2017 17:11, Jose Miguel Tejedor Fernandez wrote:

Hi,

I am using Flink v1.3.1.

My question is about how to set the configuration for the savepoints 
feature. As long as I know theconfiguration entry 
`state.savepoints.dir` must be set in the file 
|flink/conf/flink-conf.yaml|


But I would like to know if it is possible 
to programmatically set/modify that configuration entry after the 
cluster has started.

Cheers
Br
Jose M






Re: Default chaining & uid

2017-08-29 Thread Chesnay Schepler

Hello,

That depends a bit on the used version.

For 1.3 and above it does not affect chaining; the maps will be chained 
and setting the UIDs will work as if the maps weren't chained.


For 1.2, setting the UID on a chained operator is forbidden and will 
fail with an exception.


On 28.08.2017 21:57, Emily McMahon wrote:
Does setting uid affect the default chaining (ie if I have two maps in 
a row and set uid on both)?


This 
 
makes me think there's no effect


All operators that are part of a chain should be assigned an ID as
described in the Matching Operator State


 section
above.



Thanks,
Emily





Re: Flink session on Yarn - ClassNotFoundException

2017-08-29 Thread Chesnay Schepler

Hello,

The ClassNotFoundException indicates that you are using a Flink version 
that wasn't compiled against hadoop 2.7.
Replacing one part of the hadoop dependency will most likely not cut it 
(or fail mysteriously down the line),
so i would suggest to check the downloads 
 page and pick the binary 
matching your hadoop version.


It should be enough to replace the flink-shaded-hadoop2-uber-1.3.2.jar 
under /lib.


On 29.08.2017 11:46, Albert Giménez wrote:

Hi,

I’m trying to start a flink (1.3.2) session as explained in the docs 
(https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/yarn_setup.html#start-a-session), 
but I keep getting the “ClassNotFoundException” you can see below.


I’m running an HDInsight cluster on Azure (HDFS and YARN v2.7.3), and 
I’m exporting my HADOOP_CONF_DIR before running the yarn-session command.


Error while deploying YARN cluster: Couldn't deploy Yarn cluster
java.lang.RuntimeException: Couldn't deploy Yarn cluster
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploy(AbstractYarnClusterDescriptor.java:443)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:630)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:486)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:483)
at 
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)

at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:483)
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: 
java.lang.ClassNotFoundException: Class 
org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider 
not found

at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
at 
org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:161)

at org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:94)
at 
org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:72)
at 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceStart(YarnClientImpl.java:187)
at 
org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.getYarnClient(AbstractYarnClusterDescriptor.java:381)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:458)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploy(AbstractYarnClusterDescriptor.java:441)

... 9 more
Caused by: java.lang.RuntimeException: 
java.lang.ClassNotFoundException: Class 
org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider 
not found

at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2219)
... 17 more
Caused by: java.lang.ClassNotFoundException: Class 
org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider 
not found
at 
org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)

at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
... 18 more


So far, I’ve check that the yarn class path is correct (it is), I also 
tried manually linking the “hadoop-yarn-common" jar from my /usr/hdp 
directories into Flink’s “lib” directory. In that case, I get an 
“IllegalAccessError”:


Exception in thread "main" java.lang.IllegalAccessError: tried to 
access method 
org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object; 
from class 
org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider
at 
org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.init(RequestHedgingRMFailoverProxyProvider.java:75)
at 
org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:163)

at org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:94)
at 
org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:72)
at 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceStart(YarnClientImpl.java:187)
at 
org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.getYarnClient(AbstractYarnClusterDescriptor.java:381)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:458)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploy(AbstractYarnClusterDescriptor.java:441)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:630)
at 

Re: datastream.print() doesn't works

2017-08-29 Thread Chesnay Schepler

I'm afraid I don't know anything about eclipse.

Running mvn clean package will first delete all files in the /target 
directory, and then recompile your code to that very location.
It shouldn't affect an IDE in a way that isn't resolvable by rebuilding 
the project in it.
You could also try re-importing your code as a new project and see if it 
helps.


Note that you can start a Flink job with a functioning web UI from the 
IDE by calling 
`StreamExecutionEnvironment#createLocalEnvironmentWithWebUI()`, it is 
not necessary to build a jar.


On 29.08.2017 22:00, AndreaKinn wrote:

Chesnay Schepler wrote

The easiest explanation is that there is nothing to print.

Since print statements within the select function don't appear in the
logs I assume
that the result of HTM.learn is empty.

Please check via the webUI or metrics whether any of these operations
actually return records.

On 29.08.2017 13:19, AndreaKinn wrote:

Hi,
I have a simple datastream of a Tuple2. Unfortunately when I call the
print() method. No one output is showed although no errors or exceptions
are
raised.
I want to highlight that I have also other data streams which are
correctly
printed.

This is the stream:

/DataStreamTuple2lt;Double,Double> result =
HTM.learn(kafkaStream, new
Harness.AnomalyNetwork())
.select(new 
InferenceSelectFunctionHarness.KafkaRecord,

 Tuple2Double, Double>>() {

  @Override
public
Tuple2Double,Double
select(Tuple2Harness.KafkaRecord, NetworkInference inference)
throws
Exception {
  return new Tuple2Double,
Double(3.1,
inference.f1.getAnomalyScore());
   }
});/

I'm quite sure the problem isn't in the particular function described
above
because effectively in inner functions of getAnomalyScore for example the
return value exists.

Anyway I'm not able to find the problem which doesn't allow to print the
stream so I thought to write here to discover if there are typical
situation
where streams are not printed. How can I verify and debug the problem?
Some
hints about it ? I really tried everything, also tried to print something
in
the function select() for example, with no results



--
View this message in context:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/datastream-print-doesn-t-works-tp15223.html
Sent from the Apache Flink User Mailing List archive. mailing list
archive at Nabble.com.


I'm losing my head in 6 hours...
To try to start flink job in webUI I tried to generate the jar of my flink
program using mvn clean package command.
I had to solve several problems about external libraries, then the command
worked but now in Eclipse everything is fucked. I have many errors something
like ClassNotFoundException. I tried everything but I'm not able to rewind
the program to the last functioning configuration. I'm desperate...

I'm not an expert of maven, what it does exactly and how can I cancel the
clean operation to go back to the previous config?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/datastream-print-doesn-t-works-tp15223p15237.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.





Re: Classloader error after SSL setup

2017-10-04 Thread Chesnay Schepler
I don't think this is a configuration problem, but a bug in Flink. But 
we'll have to dig a little deeper to be sure.


Besides the actual SSL problem, what concerns me is that we didn't fail 
earlier. If a bug in the SSL setup prevents
the up- or download of jars then we should fail earlier. Looping in Nico 
who may have some input.


On 04.10.2017 22:58, Aniket Deshpande wrote:

Hi Chesnay,
Thanks for the reply. After your suggestion, I found out that setting 
/blob.service.ssl.enabled: false/ solved the issue and now all the 
pipelines run as expected.

So, the issue is kinda narrowed down to blob service ssl now.
I also checked the jobmanager logs when blob ssl is enabled and I see 
the following error:
/2017-10-03 23:28:50.459 [BLOB connection for /:46932] ERROR 
org.apache.flink.runtime.blob.BlobServerConnection  - Error while 
executing BLOB connection.
javax.net.ssl.SSLHandshakeException: Received fatal alert: 
certificate_unknown

at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
at sun.security.ssl.Alerts.getSSLException(Alerts.java:154)
at 
sun.security.ssl.SSLSocketImpl.recvAlert(SSLSocketImpl.java:2023)
at 
sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1125)
at 
sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1375) 

at 
sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:928)

at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:71)
at 
org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:119) 
/
So, is there some additional steps that I have to follow for enabling 
SSL for blob service?


On Wed, Oct 4, 2017 at 4:09 PM, Eron Wright <eronwri...@gmail.com 
<mailto:eronwri...@gmail.com>> wrote:


By following Chesney's recommendation we will hopefully uncover an
SSL error that is being masked. Another thing to try is to disable
hostname verification (it is enabled by default) to see whether
the certificate is being rejected.

On Wed, Oct 4, 2017 at 5:15 AM, Chesnay Schepler
<ches...@apache.org <mailto:ches...@apache.org>> wrote:

something that would also help us narrow down the problematic
area is to enable SSL for one component at a time and see
which one causesd the job to fail.


    On 04.10.2017 14:11, Chesnay Schepler wrote:

The configuration looks reasonable. Just to be sure, are the
paths accessible by all nodes?

As a first step, could you set the logging level to DEBUG (by
modifying the 'conf/log4j.properties' file), resubmit the job
(after a cluster restart) and check the Job- and TaskManager
logs for any exception?

On 04.10.2017 03:15, Aniket Deshpande wrote:

Background: We have a setup of Flink 1.3.1 along with a
secure MAPR cluster (Flink is running on mapr client nodes).
We run this flink cluster via flink-jobmanager.sh
<http://flink-jobmanager.sh> foreground and
flink-taskmanager.sh <http://flink-taskmanager.sh>
foreground command via Marathon.  In order for us to make
this work, we had to add -Djavax.net

<http://-Djavax.net>.ssl.trustStore="$JAVA_HOME/jre/lib/security/cacerts" in
flink-console.sh <http://flink-console.sh> as extra JVM arg
(otherwise, flink was taking MAPR's ssl_truststore as
default truststore and then we were facing issues for any
3rd party jars like aws_sdk etc.). This entire setup was
working fine as it is and we could submit our jars and the
pipelines ran without any problem


Problem: We started experimenting with enabling ssl for all
communication for Flink. For this, we followed

https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/security-ssl.html

<https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/security-ssl.html>
 for
generating CA and keystore. I added the following properties
to flink-conf.yaml:


security.ssl.enabled: true
security.ssl.keystore: /opt/flink/certs/node1.keystore
security.ssl.keystore-password: 
security.ssl.key-password: 
security.ssl.truststore: /opt/flink/certs/ca.truststore
security.ssl.truststore-password: 
jobmanager.web.ssl.enabled: true
taskmanager.data.ssl.enabled: true
blob.service.ssl.enabled: true
akka.ssl.enabled: true


We then spin up a cluster and tried submitting the same job
which was working before. We get the following erros:
org.apache.flink.streaming.runtime.tasks.StreamTaskException:
Cannot load user class:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09

ClassLoader i

Re: Flink 1.3.2 Netty Exception

2017-10-11 Thread Chesnay Schepler
I can confirm that the issue is reproducible with the given test, from 
the command-line and IDE.


While cutting down the test case, by replacing the outputformat with a 
DiscardingOutputFormat and the JDBCInputFormat with a simple collection, 
i stumbled onto a new Exception after ~200 iterations:


org.apache.flink.runtime.client.JobExecutionException: Job execution failed.

Caused by: java.io.IOException: Insufficient number of network buffers: 
required 4, but only 1 available. The total number of network buffers is 
currently set to 5691 of 32768 bytes each. You can increase this number by 
setting the configuration keys 'taskmanager.network.memory.fraction', 
'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:195)
at 
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:186)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:602)
at java.lang.Thread.run(Thread.java:745)


On 11.10.2017 12:48, Flavio Pompermaier wrote:

Hi to all,
we wrote a small JUnit test to reproduce a memory issue we have in a 
Flink job (that seems related to Netty) . At some point, usually 
around the 28th loop, the job fails with the following exception 
(actually we never faced that in production but maybe is related to 
the memory issue somehow...):


Caused by: java.lang.IllegalAccessError: 
org/apache/flink/runtime/io/network/netty/NettyMessage
at 
io.netty.util.internal.__matchers__.org.apache.flink.runtime.io.network.netty.NettyMessageMatcher.match(NoOpTypeParameterMatcher.java)
at 
io.netty.channel.SimpleChannelInboundHandler.acceptInboundMessage(SimpleChannelInboundHandler.java:95)
at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:102)

... 16 more

The github project is https://github.com/okkam-it/flink-memory-leak 
and the JUnit test is contained in the MemoryLeakTest class (within 
src/main/test).


Thanks in advance for any support,
Flavio





Re: Decouple Kafka partitions and Flink parallelism for ordered streams

2017-10-11 Thread Chesnay Schepler
I couldn't find a proper solution for this. The easiest solution might 
be to use the Async I/O 
<https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/asyncio.html>, 
and do the validation

with an ExecutionService or similar in the map function.

I've CC'd aljoscha, maybe he has another idea.

The local partitioning solution is, theoretically, not impossible to do, 
but it will not work with all sources and interact oddly with 
checkpoints/savepoints when changing parallelism.


Given a source parallelism S, and a map parallelism M, the idea is to 
create S sub-plans,
each consisting of a distinct source and M map functions, and ensuring 
that each runs

together (the latter part flink should already take care of).

something like:

for i in S:
source = createSeparateSource().setParallelism(1)
partitioned = source.partitionCustom(...)
partitions = []
for j in M:

partitions.add(partitioned.map(...).setParallelism(1).disableChaining())
union(partitions).write(...)

This probably doesn't work with Kafka, since distinct kafka sources 
cannot cooperate in distributing partitions AFAIK.
It also simply obliterates the concept of parallelism, which will make 
modifications to the parallelism quite a pain when

checkpointing is enabled.

I've written a sample job that uses side-outputs to do the partitioning 
(since this was the first thing that came to mind),
attached below. Note that I essentially only wrote it to see what would 
actually happen.


public static void main(String[] args) throws Exception { final 
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(); 
List<DataStream> sources = new ArrayList<>(); for (int x = 0; x 
< 6; x++) { sources.add(env.addSource(new SourceFunction() { 
@Override public void run(SourceContext ctx) throws Exception { 
for (String w : WORDS) { ctx.collect(w); } while(true) { 
Thread.sleep(5000); } } @Override public void cancel() { } })); } int 
numMaps = 4; for (int sourceIndex = 0; sourceIndex < sources.size(); 
sourceIndex++) { DataStream source = sources.get(sourceIndex); 
List<OutputTag> tags = new ArrayList<>(4); for (int x = 0; x < 
numMaps; x++) { tags.add(new OutputTag(sourceIndex + "-" + x) { 
}); } SingleOutputStreamOperator partitioned = 
source.process(new ProcessFunction<String, String>() { @Override public 
void processElement(String value, Context ctx, Collector out) 
throws Exception { ctx.output(tags.get(value.hashCode() % tags.size()), 
value); } }); List<DataStream> toUnion = new 
ArrayList<>(tags.size()); for (OutputTag tag : tags) { 
toUnion.add(partitioned.getSideOutput(tag) .map(new MapFunction<String, 
String>() { @Override public String map(String value) throws Exception { 
return tag.toString() + " - " + value; } }).disableChaining()); } 
DataStream unionBase = toUnion.remove(0); unionBase = 
unionBase.union(toUnion.toArray(new DataStream[0])); unionBase.print(); 
} // execute program env.execute("Theory");



On 11.10.2017 16:31, Chesnay Schepler wrote:
It is correct that keyBy and partition operations will distribute 
messages over the network
as they distribute the data across all subtasks. For this use-case we 
only want to consider

subtasks that are subsequent to our operator, like a local keyBy.

I don't think there is an obvious way to implement it, but I'm 
currently theory-crafting a bit

and will get back to you.

On 11.10.2017 14:52, Sanne de Roever wrote:

Hi,

Currently we need 75 Kafka partitions per topic and a parallelism of 
75 to meet required performance, increasing the partitions and 
parallelism gives diminished returns


Currently the performance is approx. 1500 msg/s per core, having one 
pipeline (source, map, sink) deployed as one instance per core.


The Kafka source performance is not an issue. The map is very heavy 
(deserialization, validation) on rather complex Avro messages. Object 
reuse is enabled.


Ideally we would like to decouple Flink processing parallelism from 
Kafka partitions in a following manner:


  * Pick a source parallelism
  * Per source, be able to pick a parallelism for the following map
  * In such a way that some message key determines which -local- map
instance gets a message from a certain visitor
  * So that messages with the same visitor key get processed by the
same map and in order for that visitor
  * Output the result to Kafka

AFAIK keyBy, partitionCustom will distribute messages over the 
network and rescale has no affinity for message identity.


Am I missing something obvious?

Cheers,

Sanne










<    1   2   3   4   5   6   7   8   9   10   >