Writing an Integration test for flink-metrics

2017-10-11 Thread Colin Williams
I have a RichMapFunction and I'd like to ensure Meter fields are properly
incremented. I've been trying to think of the best way to do this.
Currently I think that I'd need to either implement my own reporter (or use
JMX) and write to a socket, create a listener and wait for the reporter to
send the message.

Is this a good approach for writing the test, or should I be considering
something else?


Re: RichMapFunction parameters in the Streaming API

2017-10-11 Thread Colin Williams
Thanks for the detailed explanation regarding the reasoning behind not
using opens' configuration parameters!

On Wed, Oct 11, 2017 at 1:46 AM, Chesnay Schepler 
wrote:

> The Configuration parameter in open() is a relic of the previous java API
> where operators were instantiated generically.
>
> Nowadays, this is no longer the case as they are serialized instead, which
> simplifies the passing of parameters as you can
> simply store them in a field of your UDF.
>
> The configuration object passed to open() in case of the streaming API is
> always empty, and we don't plan
> to implement it since it provides little value due to the above.
>
> As such, we suggest to pass either the parameter tool, configuration
> instance or specific parameters through the constructor of user-defined
> functions and store them in a field. This applies both to the batch and
> streaming API.
>
> Personally i would stay away from the global configuration option as it is
> more brittle than the constructor approach, which makes
> it explicit that this function requires these parameters.
>
>
> On 11.10.2017 00:36, Colin Williams wrote:
>
> I was looking for withParameters(config) in the Streaming API today. I
> stumbled across the following thread.
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.
> com/withParameters-for-Streaming-API-td9332.html#a9333
>
> It appears that some of the StreamingAPI developers are in favor of
> removing the parameters from RichMapFunctions' open. However the best
> practices article
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_
> practices.html#using-the-parameters-in-your-flink-program
>
> Show examples of using both global configuration (where parameters are
> available from open) and withParameters(config) (which doesn't work from
> the Streaming API)
>
> I'm trying to make a decision regarding using global parameters with my
> Flink Streaming jobs.
>
> Is using the global configuration a good idea for parameters in the
> Streaming API or is this best practice just suggested for the Batch API?
>
> Is there a reason for the opinion of removing the configuration parameters
> from open?
>
>
>
>
>


Writing to an HDFS file from a Flink stream job

2017-10-11 Thread Isuru Suriarachchi
Hi all,

I'm just trying to use an HDFS file as the sink for my flink stream job. I
use the following line to do so.

stream.writeAsText("hdfs://hadoop-master:9000/user/isuru/foo");


I have not set "fs.hdfs.hadoopconf" in my flink configuration as it should
work with the full hdfs file name according to [1].

However, it doesn't work as expected. File foo is created on hdfs. But that
file is empty. But I don't see any error logs too on Flink side. When I
used a normal file sink using a "file:///.." url, it works fine and data is
there in the file.

Do I need any other configuration to get his working?

Thanks,
Isuru

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#hdfs


Implement bunch of transformations applied to same source stream in Apache Flink in parallel and combine result

2017-10-11 Thread Andrey Salnikov
Hi!

Could you please help me - I'm trying to use Apache Flink for machine
learning tasks with external ensemble/tree libs like XGBoost, so my
workflow will be like this:

   - receive single stream of data which atomic event looks like a simple
   vector event=(X1, X2, X3...Xn) and it can be imagined as POJO fields so
   initially we have DataStream source=...
   - a lot of feature extractions code applied to the same event
source: feature1
   = source.map(X1...Xn) feature2 = source.map(X1...Xn) etc. For simplicity
   lets DataStream feature(i) = source.map() for all features
   - then I need to create a vector with extracted features (feature1,
   feature2, ...featureK) for now it will be 40-50 features, but I'm sure
   it will contain more items in future and easily can contains 100-500
   features and more
   - put these extracted features to dataset/table columns by 10 minutes
   window and run final machine learning task on such 10 minutes data

In simple words I need to apply several quite different map operations to
the same single event in stream and then combine result from all map
functions in single vector.

So for now I can't figure out how to implement final reduce step and run
all feature extraction mapjobs in parallel if possible. I spend several
days on flink docs site, youtube videos, googling, reading Flink's sources
but it seems I'm really stuck here.

The easy solution here will be to use single map operation and run each
feature extraction code sequentially one by one in huge map body, and then
return final vector (Feature1...FeatureK) for each input event. But it
should be crazy and non optimal.

Another solution for each two pair of features use join since all feature
DataStreams has same initial event and same key and only apply some
transformation code, but it looks ugly: write 50 joins code with some window.
And I think that joins and cogroups developed for joining different streams
from different sources and not for such map/reduce operations.

As for me for all map operations here should be a something simple which
I'm missing.

Could you please point me how you guys implement such tasks in Flink, and
if possible with example of code?
PS: I posted this question

to
stackoverflow.
PPS: If I will use feature1.union(feature2...featureK) I still need somehow
separate and combine features vector before sink, and preserve order of
final vectors.

Th
​​
anks,
Andrey


Re: NoResourceAvailable exception

2017-10-11 Thread AndreaKinn
the program is composed by:

6 Kafka /source/ connector with custom timestamp and watermark /extractor/
and /map/ function each.
then I use 6 instance of an external library called flink-htm (quite heavy)
moreover I have 6 /process/ method and 2 /union/ method to merge result
streams.
Finally I have 2 Cassandra /sinks/.

Data which arriving to kafka are 1 kb strings about each 20ms.

I'm absolutely sure that the flink-htm library is heavy but I hoped flink
managed them distributing the load (which are independent) through the
cluster... instead it seems like just one node suffers all the load
crashing.

If can help I can share my code.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: DataStream joining without window

2017-10-11 Thread Yan Zhou [FDS Science] ­
Thank you for the reply. It's very helpful.

Best
Yan

On Tue, Oct 10, 2017 at 7:57 AM, Aljoscha Krettek 
wrote:

> Hi,
>
> Yes, using a TwoInputStreamOperator (or even better, a CoProcessFunction,
> because TwoInputStreamOperator is a low-level interface that might change
> in the future) is the recommended way for implementing a stream-stream
> join, currently.
>
> As you already guessed, you need a policy for cleanup up the state that
> you hold. You can do this using the timer features of CoProcessFunction.
>
> Also, if you keep your buffered elements using the Flink state interfaces
> you can switch the state backend to the RocksDB backend and if you have
> concerns about the state growing too big.
>
> Best,
> Aljoscha
>
> > On 9. Oct 2017, at 23:11, Yan Zhou [FDS Science] ­ 
> wrote:
> >
> > It seems like flink only supports DataStream joining within same time
> window. Why is it restricted in this way?
> >
> > I think I can implement a TwoInputStreamOperator to join two DataStreams
> without considering the window.  And inside the operator, create two state
> to cache records of two streams and join the streams within methods
> processElement1/processElement2. Should I go head with this approach? Is
> there any performance consideration here? If the concern is that the cache
> might take a lot of memory, we can introduce some cache policy and reduce
> the size. Or can we use rocksDB state?
> >
> > Please advise.
> >
> > Best
> > Yan
> >
>
>


R/W traffic estimation between Flink and Zookeeper

2017-10-11 Thread Hao Sun
Hi Is there a way to estimate read/write traffic between flink and zk?
I am looking for something like 1000 reads/sec or 1000 writes/sec. And the
size of the message.

Thanks


Re: NoResourceAvailable exception

2017-10-11 Thread Aljoscha Krettek
Btw, what load are you putting on the cluster, i.e. what is your computation? 
If you don't have load, the cluster and job just keep on running, right?

Best,
Aljoscha

> On 19. Sep 2017, at 12:00, AndreaKinn  wrote:
> 
> Thank you, unfortunately it had no effects.
> 
> As I add more load on the computation appears the error taskmanager killed
> on the node on use, without calling other nodes to sustain the computation. 
> I also increased 
> 
> akka.watch.heartbeat.interval
> akka.watch.heartbeat.pause
> akka.transport.heartbeat.interval
> akka.transport.heartbeat.pause
> 
> obtaining just a (very ) delayed error.
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Write each group to its own file

2017-10-11 Thread rlazoti
Hi,

Is there a way to write each group to its own file using the Dataset api
(Batch)?

For example, lets use the following class:

case class Product(name: String, category: String)

And the following Dataset:

val products = env.fromElements(Product("i7", "cpu"), Product("R5", "cpu"),
Product("gtx1080", "gpu"), Product("vega64", "gpu"), Product("evo250gb",
"ssd"))

So in this example my output should be these 3 files:

- cpu.csv
i7, cpu
R5, cpu

- gpu.csv
gtx1080, gpu
vega64, gpu

- ssd.csv
evo250gb, ssd


I tried the following code, but got
org.apache.flink.api.common.InvalidProgramException: Task not serializable.

products.groupBy("category").reduceGroup { group: Iterator[Product] =>
  val items = group.toSeq
  env.fromElements(items).writeAsCsv(s"${items.head.category}.csv")
  items
}

I welcome any of your inputs.

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Write each group to its own file

2017-10-11 Thread rlazoti
Hi,

Is there a way to write each group to its own file using the Dataset api
(Batch)?

For example, lets use the following class:


And the following Dataset:



So in this example my output should be these 3 files:

- cpu.csv
i7, cpu
R5, cpu

- gpu.csv
gtx1080, gpu
vega64, gpu

- ssd.csv
evo250gb, ssd


I tried the following code, but got
org.apache.flink.api.common.InvalidProgramException: Task not serializable.



I welcome any of your inputs.

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Decouple Kafka partitions and Flink parallelism for ordered streams

2017-10-11 Thread Chesnay Schepler
I couldn't find a proper solution for this. The easiest solution might 
be to use the Async I/O 
, 
and do the validation

with an ExecutionService or similar in the map function.

I've CC'd aljoscha, maybe he has another idea.

The local partitioning solution is, theoretically, not impossible to do, 
but it will not work with all sources and interact oddly with 
checkpoints/savepoints when changing parallelism.


Given a source parallelism S, and a map parallelism M, the idea is to 
create S sub-plans,
each consisting of a distinct source and M map functions, and ensuring 
that each runs

together (the latter part flink should already take care of).

something like:

for i in S:
source = createSeparateSource().setParallelism(1)
partitioned = source.partitionCustom(...)
partitions = []
for j in M:

partitions.add(partitioned.map(...).setParallelism(1).disableChaining())
union(partitions).write(...)

This probably doesn't work with Kafka, since distinct kafka sources 
cannot cooperate in distributing partitions AFAIK.
It also simply obliterates the concept of parallelism, which will make 
modifications to the parallelism quite a pain when

checkpointing is enabled.

I've written a sample job that uses side-outputs to do the partitioning 
(since this was the first thing that came to mind),
attached below. Note that I essentially only wrote it to see what would 
actually happen.


public static void main(String[] args) throws Exception { final 
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(); 
List> sources = new ArrayList<>(); for (int x = 0; x 
< 6; x++) { sources.add(env.addSource(new SourceFunction() { 
@Override public void run(SourceContext ctx) throws Exception { 
for (String w : WORDS) { ctx.collect(w); } while(true) { 
Thread.sleep(5000); } } @Override public void cancel() { } })); } int 
numMaps = 4; for (int sourceIndex = 0; sourceIndex < sources.size(); 
sourceIndex++) { DataStream source = sources.get(sourceIndex); 
List> tags = new ArrayList<>(4); for (int x = 0; x < 
numMaps; x++) { tags.add(new OutputTag(sourceIndex + "-" + x) { 
}); } SingleOutputStreamOperator partitioned = 
source.process(new ProcessFunction() { @Override public 
void processElement(String value, Context ctx, Collector out) 
throws Exception { ctx.output(tags.get(value.hashCode() % tags.size()), 
value); } }); List> toUnion = new 
ArrayList<>(tags.size()); for (OutputTag tag : tags) { 
toUnion.add(partitioned.getSideOutput(tag) .map(new MapFunctionString>() { @Override public String map(String value) throws Exception { 
return tag.toString() + " - " + value; } }).disableChaining()); } 
DataStream unionBase = toUnion.remove(0); unionBase = 
unionBase.union(toUnion.toArray(new DataStream[0])); unionBase.print(); 
} // execute program env.execute("Theory");



On 11.10.2017 16:31, Chesnay Schepler wrote:
It is correct that keyBy and partition operations will distribute 
messages over the network
as they distribute the data across all subtasks. For this use-case we 
only want to consider

subtasks that are subsequent to our operator, like a local keyBy.

I don't think there is an obvious way to implement it, but I'm 
currently theory-crafting a bit

and will get back to you.

On 11.10.2017 14:52, Sanne de Roever wrote:

Hi,

Currently we need 75 Kafka partitions per topic and a parallelism of 
75 to meet required performance, increasing the partitions and 
parallelism gives diminished returns


Currently the performance is approx. 1500 msg/s per core, having one 
pipeline (source, map, sink) deployed as one instance per core.


The Kafka source performance is not an issue. The map is very heavy 
(deserialization, validation) on rather complex Avro messages. Object 
reuse is enabled.


Ideally we would like to decouple Flink processing parallelism from 
Kafka partitions in a following manner:


  * Pick a source parallelism
  * Per source, be able to pick a parallelism for the following map
  * In such a way that some message key determines which -local- map
instance gets a message from a certain visitor
  * So that messages with the same visitor key get processed by the
same map and in order for that visitor
  * Output the result to Kafka

AFAIK keyBy, partitionCustom will distribute messages over the 
network and rescale has no affinity for message identity.


Am I missing something obvious?

Cheers,

Sanne










Re: got Warn message - "the expected leader session ID did not equal the received leader session ID " when using LocalFlinkMiniCluster to interpret scala code

2017-10-11 Thread Aljoscha Krettek
Hi,

I think you can make it start the Web Frontend via

conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);

In the future, this will become moot, though, when the JobManager has a proper 
REST API that is always there.

Best,
Aljoscha


> On 27. Sep 2017, at 11:40, XiangWei Huang  wrote:
> 
> Hi Till,
>   
>   I’ve found that a StandaloneMiniCluster doesn’t startup web fronted 
> when it is running.so,how can i cancel a running job on it with restful 
> method.
> 
> Cheers,
> Till
> 
>> 在 2017年9月20日,15:43,Till Rohrmann  写道:
>> 
>> Hi XiangWei,
>> 
>> programmatically there is no nice tooling yet to cancel jobs on a dedicated 
>> cluster. What you can do is to use Flink's REST API to issue a cancel 
>> command [1]. You have to send a GET request to the target URL 
>> `/jobs/:jobid/cancel`. In the future we will improve the programmatic job 
>> control which will allow you to do these kind of things more easily.
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#job-cancellation
>> 
>> Cheers,
>> Till
>> 
>> On Wed, Sep 20, 2017 at 5:46 AM, XiangWei Huang  
>> wrote:
>> Hi Till,
>>
>>  Thanks for your answer,it worked when i use StandaloneMiniCluster,but 
>> another problem is that i can’t find a way to cancel
>> a running Flink job without shutting down the cluster,for 
>> LocalFlinkMiniCluster i can do  it with below code :
>> 
>>for (job <- cluster.getCurrentlyRunningJobsJava()) {
>>   cluster.stopJob(job)
>>}
>> 
>>Is it possible to cancel a running Flink job without shutting down a 
>> StandaloneMiniCluster ?
>> 
>> Best Regards,
>> XiangWei
>> 
>> 
>> 
>>> 在 2017年9月14日,16:58,Till Rohrmann  写道:
>>> 
>>> Hi XiangWei,
>>> 
>>> the problem is that the LocalFlinkMiniCluster can no longer be used in 
>>> combination with a RemoteExecutionEnvironment. The reason is that the 
>>> LocalFlinkMiniCluster uses now an internal leader election service and 
>>> assigns leader ids to its components. Since this is an internal service it 
>>> is not possible to retrieve this information like it is the case with the 
>>> ZooKeeper based leader election services.
>>> 
>>> Long story short, the Flink Scala shell currently does not work with a 
>>> LocalFlinkMiniCluster and would have to be fixed to work properly together 
>>> with a local execution environment. Until then, I recommend starting a 
>>> local standalone cluster and let the code run there.
>>> 
>>> Cheers,
>>> Till
>>> 
>>> 
>>> On Wed, Sep 13, 2017 at 6:21 AM, XiangWei Huang  
>>> wrote:
>>> dear all,
>>> 
>>> Below is the code i execute:
>>> 
>>> import java.io._
>>> import java.net.{URL, URLClassLoader}
>>> import java.nio.charset.Charset
>>> import java.util.Collections
>>> import java.util.concurrent.atomic.AtomicBoolean
>>> 
>>> import com.netease.atom.common.util.logging.Logging
>>> import com.netease.atom.interpreter.Code.Code
>>> import com.netease.atom.interpreter.{Code, Interpreter, InterpreterResult, 
>>> InterpreterUtils}
>>> import io.netty.buffer._
>>> import org.apache.flink.api.scala.FlinkILoop
>>> import org.apache.flink.client.CliFrontend
>>> import org.apache.flink.client.cli.CliFrontendParser
>>> import org.apache.flink.client.program.ClusterClient
>>> import org.apache.flink.configuration.{QueryableStateOptions, 
>>> Configuration, ConfigConstants, GlobalConfiguration}
>>> import org.apache.flink.runtime.akka.AkkaUtils
>>> import org.apache.flink.runtime.minicluster.{StandaloneMiniCluster, 
>>> LocalFlinkMiniCluster}
>>> 
>>> import scala.Console
>>> import scala.beans.BeanProperty
>>> import scala.collection.JavaConversions._
>>> import scala.collection.mutable
>>> import scala.collection.mutable.{ArrayBuffer, ListBuffer}
>>> import scala.runtime.AbstractFunction0
>>> import scala.tools.nsc.Settings
>>> import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results}
>>> 
>>> class FlinkInterpreter extends Interpreter {
>>>   private var bufferedReader: Option[BufferedReader] = None
>>>   private var jprintWriter: JPrintWriter = _
>>>   private val config = new Configuration;
>>>   private var cluster: LocalFlinkMiniCluster = _
>>>   @BeanProperty var imain: IMain = _
>>>   @BeanProperty var flinkILoop: FlinkILoop = _
>>>   private var out: ByteBufOutputStream = null
>>>   private var outBuf: ByteBuf = null
>>>   private var in: ByteBufInputStream = _
>>>   private var isRunning: AtomicBoolean = new AtomicBoolean(false)
>>> 
>>>   override def isOpen: Boolean = {
>>> isRunning.get()
>>>   }
>>> 
>>>   def startLocalMiniCluster(): (String, Int, LocalFlinkMiniCluster) = {
>>> config.toMap.toMap.foreach(println)
>>> config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1)
>>> config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
>>> config.setInteger(ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, 1)
>>> config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
>>> config.setBoolean(QueryableStateOpti

Re: Using latency markers

2017-10-11 Thread Aljoscha Krettek
This is the Jira issue: https://issues.apache.org/jira/browse/FLINK-7608

> On 27. Sep 2017, at 12:22, Martin Eden  wrote:
> 
> Any follow-up on this? Jira? PR?
> 
> On Wed, Sep 13, 2017 at 11:30 AM, Tzu-Li (Gordon) Tai  > wrote:
> Hi Aitozi,
> 
> Yes, I think we haven’t really pin-pointed out the actual cause of the 
> problem, but if you have a fix for that and can provide a PR we can 
> definitely look at it! That would be helpful.
> Before opening a PR, also make sure to first open a JIRA for the issue (I 
> don’t think there is one yet for this issue).
> 
> Cheers,
> Gordon
> 
> On 13 September 2017 at 12:14:42 PM, aitozi (gjying1...@gmail.com 
> ) wrote:
> 
>> Hi, Aljoscha, 
>> 
>> the dashboard shown NAN is just because the value of the latencyGague is not 
>> numerical, so it can't be shown in dashboard, i removed the other 
>> latencydescprition except the sink, so i can see the latency in dashboard, 
>> do i need to post a pr? 
>> 
>> thanks, 
>> Aitozi 
>> 
>> 
>> 
>> -- 
>> Sent from: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
>>  
> 



Re: Flink Savepoint Config parameter

2017-10-11 Thread Aljoscha Krettek
Hi,

I'm assuming you're running via YARN? I think you need to add the -yid 
 parameter to your incantation.

Best,
Aljoscha

> On 28. Sep 2017, at 10:54, ant burton  wrote:
> 
> Hey,
> 
> When running in EMR and taking a savepoint with 
> 
> flink cancel -s SAVEPOINT_DIR JOB_ID
> 
> results in the following error
> 
> Caused by: org.apache.flink.util.ConfigurationException: Config parameter 
> 'Key: 'jobmanager.rpc.address' , default: null (deprecated keys: [])' is 
> missing (hostname/address of JobManager to connect to).
> 
> Am I missing something shouldn't this have been set by the cluster?
> 
> Thanks
> 
> 



Re: Flink 1.3.2 Netty Exception

2017-10-11 Thread Ufuk Celebi
@Chesnay: Recycling of network resources happens after the tasks go
into state FINISHED. Since we are submitting new jobs in a local loop
here it can easily happen that the new job is submitted before enough
buffers are available again. At least, previously that was the case.

I'm CC'ing Nico who refactored the network buffer distribution
recently and who might have more details about this specific error
message.

@Nico: another question is why there seem to be more buffers available
but we don't assign them. I'm referring to this part of the error
message "5691 of 32768 bytes...".

On Wed, Oct 11, 2017 at 2:54 PM, Chesnay Schepler  wrote:
> I can confirm that the issue is reproducible with the given test, from the
> command-line and IDE.
>
> While cutting down the test case, by replacing the outputformat with a
> DiscardingOutputFormat and the JDBCInputFormat with a simple collection, i
> stumbled onto a new Exception after ~200 iterations:
>
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>
> Caused by: java.io.IOException: Insufficient number of network buffers:
> required 4, but only 1 available. The total number of network buffers is
> currently set to 5691 of 32768 bytes each. You can increase this number by
> setting the configuration keys 'taskmanager.network.memory.fraction',
> 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
>   at
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:195)
>   at
> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:186)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:602)
>   at java.lang.Thread.run(Thread.java:745)
>
>
> On 11.10.2017 12:48, Flavio Pompermaier wrote:
>
> Hi to all,
> we wrote a small JUnit test to reproduce a memory issue we have in a Flink
> job (that seems related to Netty) . At some point, usually around the 28th
> loop, the job fails with the following exception (actually we never faced
> that in production but maybe is related to the memory issue somehow...):
>
> Caused by: java.lang.IllegalAccessError:
> org/apache/flink/runtime/io/network/netty/NettyMessage
> at
> io.netty.util.internal.__matchers__.org.apache.flink.runtime.io.network.netty.NettyMessageMatcher.match(NoOpTypeParameterMatcher.java)
> at
> io.netty.channel.SimpleChannelInboundHandler.acceptInboundMessage(SimpleChannelInboundHandler.java:95)
> at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:102)
> ... 16 more
>
> The github project is https://github.com/okkam-it/flink-memory-leak and the
> JUnit test is contained in the MemoryLeakTest class (within src/main/test).
>
> Thanks in advance for any support,
> Flavio
>
>


Re: Question about checkpointing with stateful operators and state recovery

2017-10-11 Thread Aljoscha Krettek
Hi Frederico,

I'll try and give some answers:

1. Generally speaking, no. If you use keyed state, for example via 
RuntimeContext you don't need to implement CheckpointedFunction.

2. You don't have to set setCommitOffsetsOnCheckpoints(true), this only affects 
how offsets are committed to Kafka in case other systems want to check that 
offset. To get exactly once semantics you have two general paths: 1) your sink 
is idempotent, meaning it doesn't matter whether you write output multiple 
times 2) the sink has to be integrated with Flink checkpointing and 
transactions. 2) was not easily possible for Kafka until Kafka 0.11 introduced 
transaction support. Flink 1.4 will have a Kafka 0.11 producer that supports 
transactions so with that you can have end-to-end exactly once.

3. The advantage of externalised checkpoints is that they don't get deleted 
when you cancel a job. This is different from regular checkpoints, which get 
deleted when you manually cancel a job. There are plans to make all checkpoints 
"externalised" in Flink 1.4.

4. Yes, you are correct. :-)

Best,
Aljoscha

> On 28. Sep 2017, at 11:46, Federico D'Ambrosio 
>  wrote:
> 
> Hi, I've got a couple of questions concerning the topics in the subject:
> 
> 1. If an operator is getting applied on a keyed stream, do I still have 
> to implement the CheckpointedFunction trait and define the snapshotState and 
> initializeState methods, in order to successfully recover the state from a 
> job failure?
> 
> 2. While using a FlinkKafkaConsumer, enabling checkpointing allows 
> exactly once semantics end to end, provided that the sink is able to 
> guarantee the same. Do I have to set
> setCommitOffsetsOnCheckpoints(true)? How would someone implement exactly once 
> semantics in a sink?
> 
> 3. What are the advantages of externalized checkpoints and which are the 
> cases where I would want to use them?
>   
> 4. Let's suppose a scenario where: checkpointing is enabled every 10 
> seconds, I have a kafka consumer which is set to start from the latest 
> records, a sink providing at least once semantics and a stateful keyed 
> operator inbetween the consumer and the sink. Is it correct that, in case of 
> task failure, happens the following?
> - the kafka consumer gets reverted to the latest offset (does it 
> happen even if I don't set setCommitOffsetsOnCheckpoints(true)?)
> - the operator state gets reverted to the latest checkpoint
> - the sink is stateless so it doesn't really care about what happened
> - the stream restarts and probably some of the events coming to the 
> sink have already been processed before
> 
> Thank you for attention,
> Kind regards,
> Federico



Re: Decouple Kafka partitions and Flink parallelism for ordered streams

2017-10-11 Thread Chesnay Schepler
It is correct that keyBy and partition operations will distribute 
messages over the network
as they distribute the data across all subtasks. For this use-case we 
only want to consider

subtasks that are subsequent to our operator, like a local keyBy.

I don't think there is an obvious way to implement it, but I'm currently 
theory-crafting a bit

and will get back to you.

On 11.10.2017 14:52, Sanne de Roever wrote:

Hi,

Currently we need 75 Kafka partitions per topic and a parallelism of 
75 to meet required performance, increasing the partitions and 
parallelism gives diminished returns


Currently the performance is approx. 1500 msg/s per core, having one 
pipeline (source, map, sink) deployed as one instance per core.


The Kafka source performance is not an issue. The map is very heavy 
(deserialization, validation) on rather complex Avro messages. Object 
reuse is enabled.


Ideally we would like to decouple Flink processing parallelism from 
Kafka partitions in a following manner:


  * Pick a source parallelism
  * Per source, be able to pick a parallelism for the following map
  * In such a way that some message key determines which -local- map
instance gets a message from a certain visitor
  * So that messages with the same visitor key get processed by the
same map and in order for that visitor
  * Output the result to Kafka

AFAIK keyBy, partitionCustom will distribute messages over the network 
and rescale has no affinity for message identity.


Am I missing something obvious?

Cheers,

Sanne








Re: Windowing isn't applied per key

2017-10-11 Thread Tony Wei
Hi Marcus,

Yes, each key would  has it's own window managed, so the aggregation on
window is sum of the value by each key, not sum of all element.
You can imagine that each key has its own sliding window assignor that
decides each element in each keyed stream belong to which windows, but all
keyed stream use the same strategy.
That is the definition of sliding window in Streaming API.
And the definition of yours is not supported by Flink intuitively. One way
is to implement it by yourself with ProcessFunction.

Best Regards,
Tony Wei

mclendenin 於 2017年10月11日 週三,下午8:52寫道:

> Hi Tony,
>
> In the documentation on keyed windows vs non-keyed it says that it will
> split the stream into parallel keyed streams with windows being executed in
> parallel across the keys. I would think that this would mean that each key
> has it's own window managed independently.
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#keyed-vs-non-keyed-windows
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Finding things not seen in the last window

2017-10-11 Thread Aljoscha Krettek
Hi Ron,

I think your colleague might be able to do that using a ProcessFunction with 
MapState and timers. The MapState is used to determine if a record is new. 
Timers would be used to schedule emission and also to schedule cleanup of of 
entries from the  MapState. For doing cleanup, the entries in the MapState 
could have a timestamp that you check when a cleanup timer fires.

Best,
Aljoscha


> On 30. Sep 2017, at 19:22, Haohui Mai  wrote:
> 
> Hi,
> 
> Assuming FLINK-6465 lands, will something like 
> 
> SELECT COUNT(*) FROM (SELECT FIRST_VALUE(names) FROM stream) GROUP BY 
> HOP(proctime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE)
> 
> works?
> 
> ~Haohui
> 
> On Fri, Sep 29, 2017 at 6:52 PM Ron Crocker  > wrote:
> Hi -
> 
> I have a colleague who is trying to write a flink job that will determine 
> deltas from period to period. Let’s say the periods are 1 minutes. What he 
> would like to do is report in minute 2 those things that are new since from 
> minute 1, then in minute 3 report those things that are new also since minute 
> 1.
> 
> For example, consider the stream looks like
> minute | name
> ===|===
>  1 | abc
>  1 | def
>  2 | abc
>  2 | ghi
>  3 | abc
>  3 | def
>  4 | ghi
>  4 | jkl
> 
> What we would like to report is:
> minute | count | names
> ===|===|===
>  1 | 2 | abc, def
>  2 | 1 | ghi
>  3 | 0 |
>  4 | 1 | jkl
> 
> In minute 2, abc was already seen but ghi is new, so it gets reported out as 
> new. In minute 3, abc and def havalready been seen, so there are no new 
> names, and again in minute 4 ghi has been seen but jkl is new, so we report 
> out the 1 new name.
> 
> I’m struggling to help and thought someone here might be able to help. I have 
> thought about merging two streams (the stream of new things and the stream of 
> the full set seen so far) but haven’t tried that yet. 
> 
> I welcome any of your inputs.
> 
> Thanks!
> 
> Ron
> —
> Ron Crocker
> Principal Engineer & Architect
> ( ( •)) New Relic
> rcroc...@newrelic.com 
> M: +1 630 363 8835 


Re: javax.net.ssl.SSLPeerUnverifiedException: peer not authenticated for S3 access

2017-10-11 Thread Patrick Lucas
I thought it might be a CA certificates issue, but it looks like
openjdk:8-jre-alpine includes the proper certificates.

You could just this just to make sure: exec into the container and run curl
-v https://s3.amazonaws.com. You may have to run apk add --no-cache curl
first.

Apart from that, a search for "javax.net.ssl.SSLPeerUnverifiedException
aws" yielded a number of results—have you checked those out?

--
Patrick Lucas

On Wed, Oct 4, 2017 at 5:25 PM, Hao Sun  wrote:

> Here is what my docker file says:
>
> ENV FLINK_VERSION=1.3.2 \
> HADOOP_VERSION=27 \
> SCALA_VERSION=2.11 \
>
>
> On Wed, Oct 4, 2017 at 8:23 AM Hao Sun  wrote:
>
>> I am running Flink 1.3.2 with docker on kubernetes. My docker is using
>> openjdk-8, I do not have hadoop, the version is 2.7, scala is 2.11. Thanks!
>>
>> FROM openjdk:8-jre-alpine
>>
>>
>> On Wed, Oct 4, 2017 at 8:11 AM Chesnay Schepler 
>> wrote:
>>
>>> I've found a few threads where an outdated jdk version on the
>>> server/client may be the cause.
>>>
>>> Which Flink binary (specifically, for which hadoop version) are you
>>> using?
>>>
>>>
>>> On 03.10.2017 20:48, Hao Sun wrote:
>>>
>>> com.amazonaws.http.AmazonHttpClient   - Unable to 
>>> execute HTTP request: peer not authenticated
>>> javax.net.ssl.SSLPeerUnverifiedException: peer not authenticated
>>> at 
>>> sun.security.ssl.SSLSessionImpl.getPeerCertificates(SSLSessionImpl.java:431)
>>>
>>>
>>>


Re: How flink monitor source stream task(Time Trigger) is running?

2017-10-11 Thread Aljoscha Krettek
I think this might not actually be resolved. What YunFan was referring to in 
the initial mail is the Thread factory that is used for the processing-time 
service: 
https://github.com/apache/flink/blob/5af463a9c0ff62603bc342a78dfd5483d834e8a7/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L223
 


How likely is it that a ScheduledThreadPoolExecutor simply fails? I don't think 
we currently have a mechanism that checks whether this service is still alive 
and would actually start scheduled tasks.

Best,
Aljoscha


> On 4. Oct 2017, at 09:27, Piotr Nowojski  wrote:
> 
> You are welcome :)
> 
> Piotrek
> 
>> On Oct 2, 2017, at 1:19 PM, yunfan123  wrote:
>> 
>> Thank you. 
>> "If SourceFunction.run methods returns without an exception Flink assumes
>> that it has cleanly shutdown and that there were simply no more elements to
>> collect/create by this task. "
>> This sentence solve my confusion.
>> 
>> 
>> 
>> 
>> --
>> Sent from: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> 



Re: Flink 1.3.2 Netty Exception

2017-10-11 Thread Chesnay Schepler
I can confirm that the issue is reproducible with the given test, from 
the command-line and IDE.


While cutting down the test case, by replacing the outputformat with a 
DiscardingOutputFormat and the JDBCInputFormat with a simple collection, 
i stumbled onto a new Exception after ~200 iterations:


org.apache.flink.runtime.client.JobExecutionException: Job execution failed.

Caused by: java.io.IOException: Insufficient number of network buffers: 
required 4, but only 1 available. The total number of network buffers is 
currently set to 5691 of 32768 bytes each. You can increase this number by 
setting the configuration keys 'taskmanager.network.memory.fraction', 
'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:195)
at 
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:186)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:602)
at java.lang.Thread.run(Thread.java:745)


On 11.10.2017 12:48, Flavio Pompermaier wrote:

Hi to all,
we wrote a small JUnit test to reproduce a memory issue we have in a 
Flink job (that seems related to Netty) . At some point, usually 
around the 28th loop, the job fails with the following exception 
(actually we never faced that in production but maybe is related to 
the memory issue somehow...):


Caused by: java.lang.IllegalAccessError: 
org/apache/flink/runtime/io/network/netty/NettyMessage
at 
io.netty.util.internal.__matchers__.org.apache.flink.runtime.io.network.netty.NettyMessageMatcher.match(NoOpTypeParameterMatcher.java)
at 
io.netty.channel.SimpleChannelInboundHandler.acceptInboundMessage(SimpleChannelInboundHandler.java:95)
at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:102)

... 16 more

The github project is https://github.com/okkam-it/flink-memory-leak 
and the JUnit test is contained in the MemoryLeakTest class (within 
src/main/test).


Thanks in advance for any support,
Flavio





Decouple Kafka partitions and Flink parallelism for ordered streams

2017-10-11 Thread Sanne de Roever
Hi,

Currently we need 75 Kafka partitions per topic and a parallelism of 75 to
meet required performance, increasing the partitions and parallelism gives
diminished returns

Currently the performance is approx. 1500 msg/s per core, having one
pipeline (source, map, sink) deployed as one instance per core.

The Kafka source performance is not an issue. The map is very heavy
(deserialization, validation) on rather complex Avro messages. Object reuse
is enabled.

Ideally we would like to decouple Flink processing parallelism from Kafka
partitions in a following manner:

   - Pick a source parallelism
   - Per source, be able to pick a parallelism for the following map
   - In such a way that some message key determines which -local- map
   instance gets a message from a certain visitor
   - So that messages with the same visitor key get processed by the same
   map and in order for that visitor
   - Output the result to Kafka

AFAIK keyBy, partitionCustom will distribute messages over the network and
rescale has no affinity for message identity.

Am I missing something obvious?

Cheers,

Sanne


Re: Windowing isn't applied per key

2017-10-11 Thread mclendenin
Hi Tony,

In the documentation on keyed windows vs non-keyed it says that it will
split the stream into parallel keyed streams with windows being executed in
parallel across the keys. I would think that this would mean that each key
has it's own window managed independently.

https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#keyed-vs-non-keyed-windows
 




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Windows getting created only on first execution

2017-10-11 Thread Aljoscha Krettek
Hi,

When you are restoring from a savepoint (or checkpoint) the offsets in Kafka 
are complete ignored. Flink is checkpointing the offset at the time the 
checkpoint/savepoint is taken and that will be used as the read offset when 
restoring.

Best,
Aljoscha

> On 11. Oct 2017, at 12:58, Rahul Raj  wrote:
> 
> Changing the group id didn't work for me, instead using 
> setStartfromEarliest() on kafka consumer worked for me. But it created one 
> confusion, that is in case of failure if I start from a particular checkpoint 
> or savepoint will the application start reading the message from a particular 
> offset where checkpoint/savepoint was created or it will start reading from 
> the first record in Kafka partition?
> 
> Rahul Raj 
> 
> On 11 October 2017 at 15:44, Aljoscha Krettek  > wrote:
> Hi,
> 
> I think the problem is that your Kafka consumer has the same group-id across 
> those two runs. This means that it will pick up the last "read position" of 
> the previous run, and thus not read anything. If you change the group-id for 
> the second run you should be able to read your data again.
> 
> Best,
> Aljoscha
> 
> 
>> On 11. Oct 2017, at 06:19, Rahul Raj > > wrote:
>> 
>> Hi ,
>> 
>> I have written a program which reads data from Kafka, parses the json and 
>> does some reduce operation. The problem I am facing is, the program executes 
>> perfectly for the first time on a day. But when I kill the program and 
>> execute it again, an empty file is created. Even after compiling again and 
>> running, an empty file is created.
>> 
>> var kafkaConsumer = new FlinkKafkaConsumer08(
>> 
>>   params.getRequired("input-topic"),
>> 
>>   new SimpleStringSchema,
>> 
>>   params.getProperties)
>> 
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> 
>> 
>> var messageStream = 
>> env.addSource(kafkaConsumer).filter(t=>t.contains(pattern))
>> 
>> 
>> 
>> var mts = messageStream.assignTimestampsAndWatermarks(new 
>> AssignerWithPeriodicWatermarks[String] {
>> 
>>   var ts = Long.MinValue
>> 
>> 
>> 
>>   override def extractTimestamp(element: String, 
>> previousElementTimestamp: Long): Long = {
>> 
>> var timestamp = json_decode(element).toLong
>> 
>> ts = Math.max(timestamp,previousElementTimestamp)
>> 
>> timestamp
>> 
>>   }
>> 
>> 
>> 
>>   override def getCurrentWatermark(): Watermark = {
>> 
>> new Watermark(ts)
>> 
>>   }
>> 
>> })
>> 
>> var output = mts
>> 
>>   .keyBy(t=>json_decode(t))
>> 
>>   .window(EventTimeSessionWindows.withGap(Time.seconds(60)))
>> 
>>   .allowedLateness(Time.seconds(5))
>> 
>>   .reduce((v1,v2)=>v1+""+v2)
>> 
>> 
>> 
>> output.writeAsText(path).setParallelism(1)
>> 
>> 
>> 
>> I am using FileSystem as statebackend. I am assuming this problem is related 
>> to memory cleaning, but I don't understand what's happening.
>> 
>> Any help?
>> 
>> 
>> 
>> Rahul Raj
>> 
>> 
>> 
> 
> 



Re: Manual checkpoint

2017-10-11 Thread Rahul Raj
You can use FLink's REST API to get the job id of running jobs and then
cancel it via CLI commands. You can enclose both things i.e. getting job
ids and CLI command in a single script.

Rahul Raj

On 11 October 2017 at 15:47, Aljoscha Krettek  wrote:

> Hi,
>
> Triggering a savepoint is currently not possible from within a job. The
> job would somehow have to emit a message that an outside system would pick
> up and then trigger a savepoint.
>
> Best,
> Aljoscha
>
> > On 10. Oct 2017, at 17:25, nragon 
> wrote:
> >
> > Can I trigger a checkpoint based on a specific event?
> > Meaning, if a given event arrives (containing EOF in this case) it would
> be
> > broadcasted to all downstream operators and trigger a savepoint
> aftewards.
> >
> > Thanks,
> > Nuno
> >
> >
> >
> > --
> > Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>
>


Re: Windows getting created only on first execution

2017-10-11 Thread Rahul Raj
Changing the group id didn't work for me, instead using
setStartfromEarliest() on kafka consumer worked for me. But it created one
confusion, that is in case of failure if I start from a particular
checkpoint or savepoint will the application start reading the message from
a particular offset where checkpoint/savepoint was created or it will start
reading from the first record in Kafka partition?

Rahul Raj

On 11 October 2017 at 15:44, Aljoscha Krettek  wrote:

> Hi,
>
> I think the problem is that your Kafka consumer has the same group-id
> across those two runs. This means that it will pick up the last "read
> position" of the previous run, and thus not read anything. If you change
> the group-id for the second run you should be able to read your data again.
>
> Best,
> Aljoscha
>
>
> On 11. Oct 2017, at 06:19, Rahul Raj  wrote:
>
> Hi ,
>
> I have written a program which reads data from Kafka, parses the json and
> does some reduce operation. The problem I am facing is, the program
> executes perfectly for the first time on a day. But when I kill the program
> and execute it again, an empty file is created. Even after compiling again
> and running, an empty file is created.
>
> var kafkaConsumer = new FlinkKafkaConsumer08(
>
>   params.getRequired("input-topic"),
>
>   new SimpleStringSchema,
>
>   params.getProperties)
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
> var messageStream = env.addSource(kafkaConsumer).
> filter(t=>t.contains(pattern))
>
>
> var mts = messageStream.assignTimestampsAndWatermarks(new
> AssignerWithPeriodicWatermarks[String] {
>
>   var ts = Long.MinValue
>
>
>   override def extractTimestamp(element: String,
> previousElementTimestamp: Long): Long = {
>
> var timestamp = json_decode(element).toLong
>
> ts = Math.max(timestamp,previousElementTimestamp)
>
> timestamp
>
>   }
>
>
>   override def getCurrentWatermark(): Watermark = {
>
> new Watermark(ts)
>
>   }
>
> })
>
> var output = mts
>
>   .keyBy(t=>json_decode(t))
>
>   .window(EventTimeSessionWindows.withGap(Time.seconds(60)))
>
>   .allowedLateness(Time.seconds(5))
>
>   .reduce((v1,v2)=>v1+""+v2)
>
>
> output.writeAsText(path).setParallelism(1)
>
>
> I am using FileSystem as statebackend. I am assuming this problem is
> related to memory cleaning, but I don't understand what's happening.
>
> Any help?
>
>
> Rahul Raj
>
>
>
>


Flink 1.3.2 Netty Exception

2017-10-11 Thread Flavio Pompermaier
Hi to all,
we wrote a small JUnit test to reproduce a memory issue we have in a Flink
job (that seems related to Netty) . At some point, usually around the 28th
loop, the job fails with the following exception (actually we never faced
that in production but maybe is related to the memory issue somehow...):

Caused by: java.lang.IllegalAccessError:
org/apache/flink/runtime/io/network/netty/NettyMessage
at
io.netty.util.internal.__matchers__.org.apache.flink.runtime.io.network.netty.NettyMessageMatcher.match(NoOpTypeParameterMatcher.java)
at
io.netty.channel.SimpleChannelInboundHandler.acceptInboundMessage(SimpleChannelInboundHandler.java:95)
at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:102)
... 16 more

The github project is https://github.com/okkam-it/flink-memory-leak and the
JUnit test is contained in the MemoryLeakTest class (within src/main/test).

Thanks in advance for any support,
Flavio


Re: Delete save point when using incremental checkpoint

2017-10-11 Thread Kien Truong
Thanks Stephan. That's exactly what I was asking about.

Best regards,
Kien

On Oct 11, 2017, 16:59, at 16:59, Stephan Ewen  wrote:
>Kien,
>
>I think what you are asking is: Do incremental checkpoints refer to
>chunks
>of state in a previous savepoint. Meaning that if the savepoint is
>deleted,
>will the checkpoint be missing some chunks.
>
>The answer is: No, savepoints are always full snapshots and incremental
>checkpoints do not reference and savepoint state chunks.
>
>Stephan
>
>
>On Wed, Oct 11, 2017 at 10:54 AM, Chesnay Schepler 
>wrote:
>
>> Hi,
>>
>> There is an important distinction between checkpoints (triggered by
>Flink,
>> may be incremental) and savepoints (manually triggered, always
>> self-contained).
>>
>> Your question is unfortunately mixing both terms, please expand which
>> you're referring to.
>>
>> Regards,
>> Chesnay
>>
>>
>> On 11.10.2017 10:31, Kien Truong wrote:
>>
>>> Hi,
>>>
>>> When using increment checkpoint mode, can I delete the save point
>that
>>> the job recovered from after sometime ? Or do I have to keep that
>>> checkpoint forever because it's a part of the snapshot chain ?
>>>
>>> Best regards,
>>> Kien
>>>
>>
>>
>>


Re: Manual checkpoint

2017-10-11 Thread Aljoscha Krettek
Hi,

Triggering a savepoint is currently not possible from within a job. The job 
would somehow have to emit a message that an outside system would pick up and 
then trigger a savepoint.

Best,
Aljoscha

> On 10. Oct 2017, at 17:25, nragon  wrote:
> 
> Can I trigger a checkpoint based on a specific event?
> Meaning, if a given event arrives (containing EOF in this case) it would be
> broadcasted to all downstream operators and trigger a savepoint aftewards.
> 
> Thanks,
> Nuno
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Subscribe

2017-10-11 Thread Aljoscha Krettek
Hi Stephen,

You would have to send a mail to "user-subscr...@flink.apache.org 
"

Best,
Aljoscha

> On 10. Oct 2017, at 20:08, Stephen Jiang  wrote:
> 
> 



Re: Windows getting created only on first execution

2017-10-11 Thread Aljoscha Krettek
Hi,

I think the problem is that your Kafka consumer has the same group-id across 
those two runs. This means that it will pick up the last "read position" of the 
previous run, and thus not read anything. If you change the group-id for the 
second run you should be able to read your data again.

Best,
Aljoscha

> On 11. Oct 2017, at 06:19, Rahul Raj  wrote:
> 
> Hi ,
> 
> I have written a program which reads data from Kafka, parses the json and 
> does some reduce operation. The problem I am facing is, the program executes 
> perfectly for the first time on a day. But when I kill the program and 
> execute it again, an empty file is created. Even after compiling again and 
> running, an empty file is created.
> 
> var kafkaConsumer = new FlinkKafkaConsumer08(
> 
>   params.getRequired("input-topic"),
> 
>   new SimpleStringSchema,
> 
>   params.getProperties)
> 
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> 
> 
> var messageStream = 
> env.addSource(kafkaConsumer).filter(t=>t.contains(pattern))
> 
> 
> 
> var mts = messageStream.assignTimestampsAndWatermarks(new 
> AssignerWithPeriodicWatermarks[String] {
> 
>   var ts = Long.MinValue
> 
> 
> 
>   override def extractTimestamp(element: String, 
> previousElementTimestamp: Long): Long = {
> 
> var timestamp = json_decode(element).toLong
> 
> ts = Math.max(timestamp,previousElementTimestamp)
> 
> timestamp
> 
>   }
> 
> 
> 
>   override def getCurrentWatermark(): Watermark = {
> 
> new Watermark(ts)
> 
>   }
> 
> })
> 
> var output = mts
> 
>   .keyBy(t=>json_decode(t))
> 
>   .window(EventTimeSessionWindows.withGap(Time.seconds(60)))
> 
>   .allowedLateness(Time.seconds(5))
> 
>   .reduce((v1,v2)=>v1+""+v2)
> 
> 
> 
> output.writeAsText(path).setParallelism(1)
> 
> 
> 
> I am using FileSystem as statebackend. I am assuming this problem is related 
> to memory cleaning, but I don't understand what's happening.
> 
> Any help?
> 
> 
> 
> Rahul Raj
> 
> 
> 



Re: RichMapFunction parameters in the Streaming API

2017-10-11 Thread Aljoscha Krettek
I think we should remove that part from the best-practices documentation. I'll 
quickly open a PR.

> On 11. Oct 2017, at 10:46, Chesnay Schepler  wrote:
> 
> The Configuration parameter in open() is a relic of the previous java API 
> where operators were instantiated generically.
> 
> Nowadays, this is no longer the case as they are serialized instead, which 
> simplifies the passing of parameters as you can
> simply store them in a field of your UDF.
> 
> The configuration object passed to open() in case of the streaming API is 
> always empty, and we don't plan
> to implement it since it provides little value due to the above.
> 
> As such, we suggest to pass either the parameter tool, configuration instance 
> or specific parameters through the constructor of user-defined functions and 
> store them in a field. This applies both to the batch and streaming API.
> 
> Personally i would stay away from the global configuration option as it is 
> more brittle than the constructor approach, which makes
> it explicit that this function requires these parameters.
> 
> On 11.10.2017 00:36, Colin Williams wrote:
>> I was looking for withParameters(config) in the Streaming API today. I 
>> stumbled across the following thread.
>> 
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/withParameters-for-Streaming-API-td9332.html#a9333
>>  
>> 
>> 
>> It appears that some of the StreamingAPI developers are in favor of removing 
>> the parameters from RichMapFunctions' open. However the best practices 
>> article
>> 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_practices.html#using-the-parameters-in-your-flink-program
>>  
>> 
>> 
>> Show examples of using both global configuration (where parameters are 
>> available from open) and withParameters(config) (which doesn't work from the 
>> Streaming API)
>> 
>> I'm trying to make a decision regarding using global parameters with my 
>> Flink Streaming jobs.
>> 
>> Is using the global configuration a good idea for parameters in the 
>> Streaming API or is this best practice just suggested for the Batch API?
>> 
>> Is there a reason for the opinion of removing the configuration parameters 
>> from open?
>> 
>> 
>> 
> 



Re: Delete save point when using incremental checkpoint

2017-10-11 Thread Stephan Ewen
Kien,

I think what you are asking is: Do incremental checkpoints refer to chunks
of state in a previous savepoint. Meaning that if the savepoint is deleted,
will the checkpoint be missing some chunks.

The answer is: No, savepoints are always full snapshots and incremental
checkpoints do not reference and savepoint state chunks.

Stephan


On Wed, Oct 11, 2017 at 10:54 AM, Chesnay Schepler 
wrote:

> Hi,
>
> There is an important distinction between checkpoints (triggered by Flink,
> may be incremental) and savepoints (manually triggered, always
> self-contained).
>
> Your question is unfortunately mixing both terms, please expand which
> you're referring to.
>
> Regards,
> Chesnay
>
>
> On 11.10.2017 10:31, Kien Truong wrote:
>
>> Hi,
>>
>> When using increment checkpoint mode, can I delete the save point that
>> the job recovered from after sometime ? Or do I have to keep that
>> checkpoint forever because it's a part of the snapshot chain ?
>>
>> Best regards,
>> Kien
>>
>
>
>


Re: Delete save point when using incremental checkpoint

2017-10-11 Thread Chesnay Schepler

Hi,

There is an important distinction between checkpoints (triggered by 
Flink, may be incremental) and savepoints (manually triggered, always 
self-contained).


Your question is unfortunately mixing both terms, please expand which 
you're referring to.


Regards,
Chesnay

On 11.10.2017 10:31, Kien Truong wrote:

Hi,

When using increment checkpoint mode, can I delete the save point that 
the job recovered from after sometime ? Or do I have to keep that 
checkpoint forever because it's a part of the snapshot chain ?


Best regards,
Kien





Re: RichMapFunction parameters in the Streaming API

2017-10-11 Thread Chesnay Schepler
The Configuration parameter in open() is a relic of the previous java 
API where operators were instantiated generically.


Nowadays, this is no longer the case as they are serialized instead, 
which simplifies the passing of parameters as you can

simply store them in a field of your UDF.

The configuration object passed to open() in case of the streaming API 
is always empty, and we don't plan

to implement it since it provides little value due to the above.

As such, we suggest to pass either the parameter tool, configuration 
instance or specific parameters through the constructor of user-defined 
functions and store them in a field. This applies both to the batch and 
streaming API.


Personally i would stay away from the global configuration option as it 
is more brittle than the constructor approach, which makes

it explicit that this function requires these parameters.

On 11.10.2017 00:36, Colin Williams wrote:
I was looking for withParameters(config) in the Streaming API today. I 
stumbled across the following thread.


http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/withParameters-for-Streaming-API-td9332.html#a9333 



It appears that some of the StreamingAPI developers are in favor of 
removing the parameters from RichMapFunctions' open. However the best 
practices article


https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_practices.html#using-the-parameters-in-your-flink-program

Show examples of using both global configuration (where parameters are 
available from open) and withParameters(config) (which doesn't work 
from the Streaming API)


I'm trying to make a decision regarding using global parameters with 
my Flink Streaming jobs.


Is using the global configuration a good idea for parameters in the 
Streaming API or is this best practice just suggested for the Batch API?


Is there a reason for the opinion of removing the configuration 
parameters from open?








Delete save point when using incremental checkpoint

2017-10-11 Thread Kien Truong
Hi,

When using increment checkpoint mode, can I delete the save point that the job 
recovered from after sometime ? Or do I have to keep that checkpoint forever 
because it's a part of the snapshot chain ?

Best regards,
Kien