Re: kinesis producer setCustomPartitioner use stream's own data

2017-02-20 Thread Tzu-Li (Gordon) Tai
Hi Sathi,

The `getPartitionId` method is invoked with each record from the stream. In 
there, you can extract values / fields from the record, and use that to 
determine the target partition id.

Is this what you had in mind?

Cheers,
Gordon

On February 21, 2017 at 11:54:21 AM, Sathi Chowdhury 
(sathi.chowdh...@elliemae.com) wrote:

Hi flink users and experts,

 

In my flink processor I am trying to use Flink Kinesis connector . I read from 
a kinesis stream , and After the transformation (for which I use 
RichCoFlatMapFunction), json event needs to sink to a kinesis stream k1.

DataStream myStream = see.addSource(new 
FlinkKinesisConsumer<>(inputStream, new MyDeserializationSchema(), 
consumerConfig));
 

 

For setting up the producer including partitioning I want to use 
setCustompartitioner , but the problem is that I don’t know how to access a 
parameters inside myStream , I have multiple fields that I want to extract from 
the stream  right there in the main method and use them in deciding the 
partition key. is possible to choose a partition key that is prepared from the 
stream ? if so can you please share an example.

 

 


kinesis.setCustomPartitioner(new KinesisPartitioner() {
    @Override
    public String getPartitionId(String element) {
    int l = element.length();   /// here I want to bring values extracted 
from the stream
    return element.substring(l - 1, l);
    }
});

 

Thanks

Sathi

=Notice to Recipient: This e-mail transmission, and any documents, 
files or previous e-mail messages attached to it may contain information that 
is confidential or legally privileged, and intended for the use of the 
individual or entity named above. If you are not the intended recipient, or a 
person responsible for delivering it to the intended recipient, you are hereby 
notified that you must not read this transmission and that any disclosure, 
copying, printing, distribution or use of any of the information contained in 
or attached to this transmission is STRICTLY PROHIBITED. If you have received 
this transmission in error, please immediately notify the sender by telephone 
or return e-mail and delete the original transmission and its attachments 
without reading or saving in any manner. Thank you. =

kinesis producer setCustomPartitioner use stream's own data

2017-02-20 Thread Sathi Chowdhury
Hi flink users and experts,

In my flink processor I am trying to use Flink Kinesis connector . I read from 
a kinesis stream , and After the transformation (for which I use 
RichCoFlatMapFunction), json event needs to sink to a kinesis stream k1.

DataStream myStream = see.addSource(new 
FlinkKinesisConsumer<>(inputStream, new MyDeserializationSchema(), 
consumerConfig));


For setting up the producer including partitioning I want to use 
setCustompartitioner , but the problem is that I don’t know how to access a 
parameters inside myStream , I have multiple fields that I want to extract from 
the stream  right there in the main method and use them in deciding the 
partition key. is possible to choose a partition key that is prepared from the 
stream ? if so can you please share an example.



kinesis.setCustomPartitioner(new KinesisPartitioner() {
@Override
public String getPartitionId(String element) {
int l = element.length();   /// here I want to bring values extracted 
from the stream
return element.substring(l - 1, l);
}
});

Thanks
Sathi
=Notice to Recipient: This e-mail transmission, and any documents, 
files or previous e-mail messages attached to it may contain information that 
is confidential or legally privileged, and intended for the use of the 
individual or entity named above. If you are not the intended recipient, or a 
person responsible for delivering it to the intended recipient, you are hereby 
notified that you must not read this transmission and that any disclosure, 
copying, printing, distribution or use of any of the information contained in 
or attached to this transmission is STRICTLY PROHIBITED. If you have received 
this transmission in error, please immediately notify the sender by telephone 
or return e-mail and delete the original transmission and its attachments 
without reading or saving in any manner. Thank you. =


回复:回复:Transfer information from one window to the next

2017-02-20 Thread 施晓罡(星罡)
Hi Sonex
All windows under the same key (e.g., TimeWindow(0, 3600) and TimeWindow(3600, 
7200)) will be processed by the flatmap function. You can put the variable 
drawn from TimeWindow(0, 3600) into a State. When you receive TimeWindow(3600, 
7200), you can access the state and apply the function with the obtained 
variable.
Regards,Xiaogang
--发件人:Sonex 
发送时间:2017年2月20日(星期一) 19:54收件人:user 
主 题:Re: 回复:Transfer information from one window to the 
next
I don`t think you understood the question correctly. I do not care about
information between windows at the same time (i.e., start of window = 0, end
of window 3600). I want to pass a variable, let`s say for key 1, from the
apply function of window 0-3600 to the apply function of window 3600-7200,
for key 1.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Transfer-information-from-one-window-to-the-next-tp11738p11739.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Watermarks per key

2017-02-20 Thread jganoff
There's nothing stopping me assigning timestamps and generating watermarks on
a keyed stream in the implementation and the KeyedStream API supports it. It
appears the underlying operator that gets created in
DataStream.assignTimestampsAndWatermarks() isn't key-aware and globally
tracks timestamps. So is that what's technically preventing assigning
timestamps per key from working?

I'm curious to hear Aljoscha's thoughts on watermark management across keys.

Thanks!



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Watermarks-per-key-tp11628p11761.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Checkpointing with RocksDB as statebackend

2017-02-20 Thread vinay patil
Hi Stephan,

Just saw your mail while I was explaining the answer to your earlier
questions. I have attached some more screenshots which are taken from the
latest run today.
Yes I will try to set it to higher value and check if performance improves

Let me know your thoughts

Regards,
Vinay Patil

On Tue, Feb 21, 2017 at 12:51 AM, Stephan Ewen [via Apache Flink User
Mailing List archive.]  wrote:

> @Vinay!
>
> Just saw the screenshot you attached to the first mail. The checkpoint
> that failed came after one that had an incredible heavy alignment phase (14
> GB).
> I think that working that off threw the next checkpoint because the
> workers were still working off the alignment backlog.
>
> I think you can for now fix this by setting the minimum pause between
> checkpoints a bit higher (it is probably set a bit too small for the state
> of your application).
>
> Also, can you describe what your sources are (Kafka / Kinesis or file
> system)?
>
> BTW: We are currently working on
>   - incremental RocksDB checkpoints
>   - the network stack to allow in the future for a new way of doing the
> alignment
>
> Both of that should help that the program is more resilient to these
> situations.
>
> Best,
> Stephan
>
>
>
> On Mon, Feb 20, 2017 at 7:51 PM, Stephan Ewen <[hidden email]
> > wrote:
>
>> Hi Vinay!
>>
>> Can you start by giving us a bit of an environment spec?
>>
>>   - What Flink version are you using?
>>   - What is your rough topology (what operations does the program use)
>>   - Where is the state (windows, keyBy)?
>>   - What is the rough size of your checkpoints and where does the time
>> go? Can you attach a screenshot from https://ci.apache.org/pro
>> jects/flink/flink-docs-release-1.2/monitoring/checkpoint_monitoring.html
>>   - What is the size of the JVM?
>>
>> Those things would be helpful to know...
>>
>> Best,
>> Stephan
>>
>>
>> On Mon, Feb 20, 2017 at 7:04 PM, vinay patil <[hidden email]
>> > wrote:
>>
>>> Hi Xiaogang,
>>>
>>> Thank you for your inputs.
>>>
>>> Yes I have already tried setting MaxBackgroundFlushes and
>>> MaxBackgroundCompactions to higher value (tried with 2, 4, 8) , still not
>>> getting expected results.
>>>
>>> System.getProperty("java.io.tmpdir") points to /tmp but there I could
>>> not find RocksDB logs, can you please let me know where can I find it ?
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>> On Mon, Feb 20, 2017 at 7:32 AM, xiaogang.sxg [via Apache Flink User
>>> Mailing List archive.] <[hidden email]
>>> > wrote:
>>>
 Hi Vinay

 Can you provide the LOG file in RocksDB? It helps a lot to figure out
 the problems becuse it records the options and the events happened
 during the execution. Otherwise configured, it should locate at the
 path set in System.getProperty("java.io.tmpdir").

 Typically, a large amount of memory is consumed by RocksDB to store
 necessary indices. To avoid the unlimited growth in the memory consumption,
 you can put these indices into block cache (set CacheIndexAndFilterBlock to
 true) and properly set the block cache size.

 You can also increase the number of backgroud threads to improve the
 performance of flushes and compactions (via MaxBackgroundFlushes and
 MaxBackgroudCompactions).

 In YARN clusters, task managers will be killed if their memory
 utilization exceeds the allocation size. Currently Flink does not count the
 memory used by RocksDB in the allocation. We are working on fine-grained
 resource allocation (see FLINK-5131). It may help to avoid such problems.

 May the information helps you.

 Regards,
 Xiaogang


 --
 发件人:Vinay Patil <[hidden email]
 >
 发送时间:2017年2月17日(星期五) 21:19
 收件人:user <[hidden email]
 >
 主 题:Re: Checkpointing with RocksDB as statebackend

 Hi Guys,

 There seems to be some issue with RocksDB memory utilization.

 Within few minutes of job run the physical memory usage increases by
 4-5 GB and it keeps on increasing.
 I have tried different options for Max Buffer Size(30MB, 64MB, 128MB ,
 512MB) and Min Buffer to Merge as 2, but the physical memory keeps on
 increasing.

 According to RocksDB documentation, these are the main options on which
 flushing to storage is based.

 Can you please point me where am I doing wrong. I have tried different
 configuration options but each time the Task Manager is getting killed
 after some time :)

 Regards,
 Vinay Patil

 On Thu, Feb 16, 2017 at 6:02 PM, Vinay Patil <[hidden email]
 

Re: Checkpointing with RocksDB as statebackend

2017-02-20 Thread vinay patil
Hi Stephan,

I am using Flink 1.2.0 version, and running the job on on YARN using
c3.4xlarge EC2 instances having 16 cores and 30GB RAM each.

In total I have set 32 slots and alloted 1200 network buffers

I have attached the latest checkpointing snapshot, its configuration, cpu
load average ,physical memory usage and heap memory usage here:

 

 

 

 

Before I describe the topology I want to let you know that when I enabled
object reuse, 32M records (total 64M - two kafka source ) were processed in
17minutes , I did not see much halt in between , how does the object reuse
help here , I have used FLASH_SSD_OPTIMIZED option ? This is the best result
I have got till now (earlier time was 1hr 3minutes). But I don't understand
how did it work ? :)

The program use the following operations:
1. Consume Data from two kafka sources
2. Extract the information from the record (flatmap)
3. Write as is data to S3  (sink)
4. Union both the streams and apply tumbling window on it to perform outer
join (keyBy->window->apply)
5. Some other operators downstream to enrich the data (map->flatMap->map)
6. Write the enriched data to S3 (sink)

I have allocated 8GB of heap space to each TM (find the 4th snap above)

Final aim is to test with minimum 100M records.

Let me know your inputs

Regards,
Vinay Patil





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11759.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Checkpointing with RocksDB as statebackend

2017-02-20 Thread Stephan Ewen
@Vinay!

Just saw the screenshot you attached to the first mail. The checkpoint that
failed came after one that had an incredible heavy alignment phase (14 GB).
I think that working that off threw the next checkpoint because the workers
were still working off the alignment backlog.

I think you can for now fix this by setting the minimum pause between
checkpoints a bit higher (it is probably set a bit too small for the state
of your application).

Also, can you describe what your sources are (Kafka / Kinesis or file
system)?

BTW: We are currently working on
  - incremental RocksDB checkpoints
  - the network stack to allow in the future for a new way of doing the
alignment

Both of that should help that the program is more resilient to these
situations.

Best,
Stephan



On Mon, Feb 20, 2017 at 7:51 PM, Stephan Ewen  wrote:

> Hi Vinay!
>
> Can you start by giving us a bit of an environment spec?
>
>   - What Flink version are you using?
>   - What is your rough topology (what operations does the program use)
>   - Where is the state (windows, keyBy)?
>   - What is the rough size of your checkpoints and where does the time go?
> Can you attach a screenshot from https://ci.apache.org/
> projects/flink/flink-docs-release-1.2/monitoring/
> checkpoint_monitoring.html
>   - What is the size of the JVM?
>
> Those things would be helpful to know...
>
> Best,
> Stephan
>
>
> On Mon, Feb 20, 2017 at 7:04 PM, vinay patil 
> wrote:
>
>> Hi Xiaogang,
>>
>> Thank you for your inputs.
>>
>> Yes I have already tried setting MaxBackgroundFlushes and
>> MaxBackgroundCompactions to higher value (tried with 2, 4, 8) , still not
>> getting expected results.
>>
>> System.getProperty("java.io.tmpdir") points to /tmp but there I could
>> not find RocksDB logs, can you please let me know where can I find it ?
>>
>> Regards,
>> Vinay Patil
>>
>> On Mon, Feb 20, 2017 at 7:32 AM, xiaogang.sxg [via Apache Flink User
>> Mailing List archive.] <[hidden email]
>> > wrote:
>>
>>> Hi Vinay
>>>
>>> Can you provide the LOG file in RocksDB? It helps a lot to figure out
>>> the problems becuse it records the options and the events happened
>>> during the execution. Otherwise configured, it should locate at the
>>> path set in System.getProperty("java.io.tmpdir").
>>>
>>> Typically, a large amount of memory is consumed by RocksDB to store
>>> necessary indices. To avoid the unlimited growth in the memory consumption,
>>> you can put these indices into block cache (set CacheIndexAndFilterBlock to
>>> true) and properly set the block cache size.
>>>
>>> You can also increase the number of backgroud threads to improve the
>>> performance of flushes and compactions (via MaxBackgroundFlushes and
>>> MaxBackgroudCompactions).
>>>
>>> In YARN clusters, task managers will be killed if their memory
>>> utilization exceeds the allocation size. Currently Flink does not count the
>>> memory used by RocksDB in the allocation. We are working on fine-grained
>>> resource allocation (see FLINK-5131). It may help to avoid such problems.
>>>
>>> May the information helps you.
>>>
>>> Regards,
>>> Xiaogang
>>>
>>>
>>> --
>>> 发件人:Vinay Patil <[hidden email]
>>> >
>>> 发送时间:2017年2月17日(星期五) 21:19
>>> 收件人:user <[hidden email]
>>> >
>>> 主 题:Re: Checkpointing with RocksDB as statebackend
>>>
>>> Hi Guys,
>>>
>>> There seems to be some issue with RocksDB memory utilization.
>>>
>>> Within few minutes of job run the physical memory usage increases by 4-5
>>> GB and it keeps on increasing.
>>> I have tried different options for Max Buffer Size(30MB, 64MB, 128MB ,
>>> 512MB) and Min Buffer to Merge as 2, but the physical memory keeps on
>>> increasing.
>>>
>>> According to RocksDB documentation, these are the main options on which
>>> flushing to storage is based.
>>>
>>> Can you please point me where am I doing wrong. I have tried different
>>> configuration options but each time the Task Manager is getting killed
>>> after some time :)
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>> On Thu, Feb 16, 2017 at 6:02 PM, Vinay Patil <[hidden email]
>>> > wrote:
>>> I think its more of related to RocksDB, I am also not aware about
>>> RocksDB but reading the tuning guide to understand the important values
>>> that can be set
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>> On Thu, Feb 16, 2017 at 5:48 PM, Stefan Richter [via Apache Flink User
>>> Mailing List archive.] <[hidden email]
>>> > wrote:
>>> What kind of problem are we talking about? S3 related or RocksDB
>>> related. I am not aware of problems with RocksDB per se. I think seeing
>>> logs for this would be very helpful.
>>>
>>> Am 16.02.2017 um 11:56 schrieb Aljoscha Krettek <[hidden email]
>>> 

Re: Log4J

2017-02-20 Thread Stephan Ewen
How about adding this to the "logging" docs - a section on how to run log4j2

On Mon, Feb 20, 2017 at 8:50 AM, Robert Metzger  wrote:

> Hi Chet,
>
> These are the files I have in my lib/ folder with the working log4j2
> integration:
>
> -rw-r--r--  1 robert robert 79966937 Oct 10 13:49 flink-dist_2.10-1.1.3.jar
> -rw-r--r--  1 robert robert90883 Dec  9 20:13
> flink-python_2.10-1.1.3.jar
> -rw-r--r--  1 robert robert60547 Dec  9 18:45 log4j-1.2-api-2.7.jar
> -rw-rw-r--  1 robert robert  1638598 Oct 22 16:08
> log4j2-gelf-1.3.1-shaded.jar
> -rw-rw-r--  1 robert robert 1056 Dec  9 20:12 log4j2.properties
> -rw-r--r--  1 robert robert   219001 Dec  9 18:45 log4j-api-2.7.jar
> -rw-r--r--  1 robert robert  1296865 Dec  9 18:45 log4j-core-2.7.jar
> -rw-r--r--  1 robert robert22918 Dec  9 18:46 log4j-slf4j-impl-2.7.jar
>
> You don't need the "log4j2-gelf-1.3.1-shaded.jar", that's a GELF appender
> for Greylog2.
>
> On Mon, Feb 20, 2017 at 5:41 AM, Chet Masterson  > wrote:
>
>> I read through the link you provided, Stephan. However, I am still
>> confused. The instructions mention specific jar files for Logback, I am not
>> sure which of the log4j 2.x jars I need to put in the the flink /lib
>> directory. I tried various combinations of log4j-1.2-api-2.8.jar,
>> log4j-slf4j-impl-2.8.jar, log4j-to-slf4j-2.8.jar, and renamed the stock
>> log4j-1.2.17.jar and slf4j-log4j12-1.7.7.jar, but then the job manager
>> would not start, and threw a 'NoClassDefFoundError:
>> org/apache/logging/log4j/LogManager'. And this is without deploying my
>> job out there, so I don't think any of the "Use Logback when running Flink
>> out of the IDE / from a Java application" section instructions are relevant.
>>
>> Can someone be more specific how to do this? If I get it to work, I'll be
>> happy to formally document it in whatever format would help the project out
>> long term.
>>
>> Thanks!
>>
>>
>> 16.02.2017, 05:54, "Stephan Ewen" :
>>
>> Hi!
>>
>> The bundled log4j version (1.x) does not support that.
>>
>> But you can replace the logging jars with those of a different framework
>> (like log4j 2.x), which supports changing the configuration without
>> stopping the application.
>> You don't need to rebuild flink, simply replace two jars in the "lib"
>> folder (and update the config file, because log4j 2.x has a different
>> config format).
>>
>> This guide shows how to swap log4j 1.x for logback, and you should be
>> able to swap in log4j 2.x in the exact same way.
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/
>> monitoring/best_practices.html#use-logback-when-running-
>> flink-on-a-cluster
>>
>>
>> On Thu, Feb 16, 2017 at 5:20 AM, Chet Masterson <
>> chet.master...@yandex.com> wrote:
>>
>> Is there a way to reload a log4j.properties file without stopping and
>> starting the job server?
>>
>>
>


Re: blob store defaults to /tmp and files get deleted

2017-02-20 Thread Stephan Ewen
Hi Shannon!

In the latest HA and BlobStore changes (1.3) it uses "/tmp" only for
caching and will re-obtain the files from the persistent storage.

I think we should make this a bigger point, even:
  - Flink should not use "/tmp" at all (except for mini cluster mode)
  - Yarn and Mesos should always use the "local directory" for temporary
files. They are cleaned up anyways.
  - For the Standalone Setup, one should configure a suitable temp storage
dir (and everything should be relative to that).

Stephan



On Mon, Feb 20, 2017 at 3:22 PM, Ufuk Celebi  wrote:

> Hey Shannon,
>
> good idea! We currently have this:
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/ops/production_ready.html
>
> It has a strong focus on managed state and not the points you mentioned.
>
> Would you like to create an issue for adding this to the production
> check list? I think it's valuable feedback.
>
> – Ufuk
>
>
> On Fri, Feb 17, 2017 at 7:24 PM, Shannon Carey  wrote:
> > A few of my jobs recently failed and showed this exception:
> >
> >
> > org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> load
> > user class: org.apache.flink.streaming.connectors.kafka.
> FlinkKafkaConsumer09
> > ClassLoader info: URL ClassLoader:
> > file:
> > '/tmp/blobStore-5f023409-6af5-4de6-8ed0-e80a2eb9633e/cache/blob_
> d9a9fb884f3b436030afcf7b8e1bce678acceaf2'
> > (invalid JAR: zip file is empty)
> > Class not resolvable through given classloader.
> >   at
> > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(
> StreamConfig.java:208)
> >   at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:224)
> >   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
> >   at java.lang.Thread.run(Thread.java:745)
> >
> >
> > As you can see, Flink is storing things underneath /tmp, which is the
> > (undocumented) default for the blob store. As you may know, on Linux,
> > there's typically a program such as tmpwatch which is run periodically to
> > clear out data from /tmp.
> >
> >
> > Flink also uses /tmp as the default for jobmanager.web.tmpdir (and
> > jobmanager.web.upload.dir in 1.2).
> >
> >
> > Therefore, assuming that this is indeed the cause of the job failure/the
> > exception, it seems highly advisable that when you run a Flink cluster
> you
> > configure blob.storage.directory and jobmanager.web.tmpdir to a specific
> > folder that is not beneath /tmp. I don't know if there is any material
> about
> > setting up a production cluster, but this would definitely seem to be a
> > necessary configuration to specify if you want to avoid problems.
> Enabling
> > High Availability mode should also be on that list, I think.
> >
> >
> > -Shannon
>


Re: Is it OK to have very many session windows?

2017-02-20 Thread Stephan Ewen
With pre-aggregation (which the Reduce does), Flink can handle many windows
and many keys, as long as you have the memory and storage to support that.
Your case should work.


On Mon, Feb 20, 2017 at 4:58 PM, Vadim Vararu 
wrote:

> It's something like:
>
> DataStreamSource stream = 
> env.addSource(getKafkaConsumer(parameterTool));stream
> .map(getEventToDomainMapper())
> .keyBy(getKeySelector())
> .window(ProcessingTimeSessionWindows.withGap(Time.minutes(90)))
> .reduce(getReducer())
> .map(getToJsonMapper())
> .addSink(getKafkaProducer(parameterTool));
>
>
> Each new event may be reduced against the existent state from the window,
> so normally it's okay to have only 1 row in memory.
>
> I'm new to Flink and have not yet reached the "incremental aggregates"
> but, if i understand correctly, it fits my case.
>
> Vadim.
>
>
>
>
> On 20.02.2017 17:47, Timo Walther wrote:
>
> Hi Vadim,
>
> this of course depends on your use case. The question is how large is your
> state per pane and how much memory is available for Flink?
> Are you using incremental aggregates such that only the aggregated value
> per pane has to be kept in memory?
>
> Regards,
> Timo
>
>
> Am 20/02/17 um 16:34 schrieb Vadim Vararu:
>
> HI guys,
>
> Is it okay to have very many (tens of thousands or hundreds of thousand)
> of session windows?
>
>
> Thanks, Vadim.
>
>
>
>
>


Re: How to achieve exactly once on node failure using Kafka

2017-02-20 Thread Stephan Ewen
Hi!

Exactly-once end-to-end requires sinks that support that kind of behavior
(typically some form of transactions support).

Kafka currently does not have the mechanisms in place to support
exactly-once sinks, but the Kafka project is working on that feature.
For ElasticSearch, it is also not simply possible (because of missing
transactions), but you can use Flink's state as the "authorative" state (it
is exactly once) and then write changes to Flink's state to Elastic. That
way the writes to ElasticSearch become "idempotent", which means duplicates
simple make no additional changes.

Hope that helps!

Stephan




On Mon, Feb 20, 2017 at 5:53 PM, Y. Sakamoto  wrote:

> Hi,
> I'm using Flink 1.2.0 and try to do "exactly once" data transfer
> from Kafka to Elasticsearch, but I cannot.
> (Scala 2.11, Kafka 0.10, without YARN)
>
> There are 2 Flink TaskManager nodes, and when processing
> with 2 parallelism, shutdown one of them (simulating node failure).
>
> Using flink-connector-kafka, I wrote following code:
>
>StreamExecutionEnvironment env = StreamExecutionEnvironment
>  .getExecutionEnvironment();
>env.enableCheckpointing(1000L);
>env.setParallelism(2);
>
>Properties kafkaProp = new Properties();
>kafkaProp.setProperty("bootstrap.servers", "192.168.97.42:9092");
>kafkaProp.setProperty("zookeeper.connect", "192.168.97.42:2181");
>kafkaProp.setProperty("group.id", "id");
>
>DataStream stream = env.addSource(new FlinkKafkaConsumer010<>(
>  "topic", new SimpleStringSchema(), kafkaProp));
>
> I found duplicated data transfer on map function.
> Data from the checkpoint before node failure seems duplicated.
>
> Is there any way to achieve "exactly once" on failure?
>
>
> Thanks.
> Yuichiro
>


Re: Checkpointing with RocksDB as statebackend

2017-02-20 Thread Stephan Ewen
Hi Vinay!

Can you start by giving us a bit of an environment spec?

  - What Flink version are you using?
  - What is your rough topology (what operations does the program use)
  - Where is the state (windows, keyBy)?
  - What is the rough size of your checkpoints and where does the time go?
Can you attach a screenshot from
https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/checkpoint_monitoring.html
  - What is the size of the JVM?

Those things would be helpful to know...

Best,
Stephan


On Mon, Feb 20, 2017 at 7:04 PM, vinay patil 
wrote:

> Hi Xiaogang,
>
> Thank you for your inputs.
>
> Yes I have already tried setting MaxBackgroundFlushes and
> MaxBackgroundCompactions to higher value (tried with 2, 4, 8) , still not
> getting expected results.
>
> System.getProperty("java.io.tmpdir") points to /tmp but there I could not
> find RocksDB logs, can you please let me know where can I find it ?
>
> Regards,
> Vinay Patil
>
> On Mon, Feb 20, 2017 at 7:32 AM, xiaogang.sxg [via Apache Flink User
> Mailing List archive.] <[hidden email]
> > wrote:
>
>> Hi Vinay
>>
>> Can you provide the LOG file in RocksDB? It helps a lot to figure out
>> the problems becuse it records the options and the events happened
>> during the execution. Otherwise configured, it should locate at the path
>> set in System.getProperty("java.io.tmpdir").
>>
>> Typically, a large amount of memory is consumed by RocksDB to store
>> necessary indices. To avoid the unlimited growth in the memory consumption,
>> you can put these indices into block cache (set CacheIndexAndFilterBlock to
>> true) and properly set the block cache size.
>>
>> You can also increase the number of backgroud threads to improve the
>> performance of flushes and compactions (via MaxBackgroundFlushes and
>> MaxBackgroudCompactions).
>>
>> In YARN clusters, task managers will be killed if their memory
>> utilization exceeds the allocation size. Currently Flink does not count the
>> memory used by RocksDB in the allocation. We are working on fine-grained
>> resource allocation (see FLINK-5131). It may help to avoid such problems.
>>
>> May the information helps you.
>>
>> Regards,
>> Xiaogang
>>
>>
>> --
>> 发件人:Vinay Patil <[hidden email]
>> >
>> 发送时间:2017年2月17日(星期五) 21:19
>> 收件人:user <[hidden email]
>> >
>> 主 题:Re: Checkpointing with RocksDB as statebackend
>>
>> Hi Guys,
>>
>> There seems to be some issue with RocksDB memory utilization.
>>
>> Within few minutes of job run the physical memory usage increases by 4-5
>> GB and it keeps on increasing.
>> I have tried different options for Max Buffer Size(30MB, 64MB, 128MB ,
>> 512MB) and Min Buffer to Merge as 2, but the physical memory keeps on
>> increasing.
>>
>> According to RocksDB documentation, these are the main options on which
>> flushing to storage is based.
>>
>> Can you please point me where am I doing wrong. I have tried different
>> configuration options but each time the Task Manager is getting killed
>> after some time :)
>>
>> Regards,
>> Vinay Patil
>>
>> On Thu, Feb 16, 2017 at 6:02 PM, Vinay Patil <[hidden email]
>> > wrote:
>> I think its more of related to RocksDB, I am also not aware about RocksDB
>> but reading the tuning guide to understand the important values that can be
>> set
>>
>> Regards,
>> Vinay Patil
>>
>> On Thu, Feb 16, 2017 at 5:48 PM, Stefan Richter [via Apache Flink User
>> Mailing List archive.] <[hidden email]
>> > wrote:
>> What kind of problem are we talking about? S3 related or RocksDB related.
>> I am not aware of problems with RocksDB per se. I think seeing logs for
>> this would be very helpful.
>>
>> Am 16.02.2017 um 11:56 schrieb Aljoscha Krettek <[hidden email]
>> >:
>>
>> [hidden email]  and 
>> [hidden
>> email]  could this
>> be the same problem that you recently saw when working with other people?
>>
>> On Wed, 15 Feb 2017 at 17:23 Vinay Patil <[hidden email]
>> > wrote:
>> Hi Guys,
>>
>> Can anyone please help me with this issue
>>
>> Regards,
>> Vinay Patil
>>
>> On Wed, Feb 15, 2017 at 6:17 PM, Vinay Patil <[hidden email]
>> > wrote:
>> Hi Ted,
>>
>> I have 3 boxes in my pipeline , 1st and 2nd box containing source and s3
>> sink and the 3rd box is window operator followed by chained operators and a
>> s3 sink
>>
>> So in the details link section I can see that that S3 sink is taking time
>> for the acknowledgement and it is not even going to the window operator
>> chain.
>>
>> But as shown in the 

Re: Checkpointing with RocksDB as statebackend

2017-02-20 Thread vinay patil
Hi Xiaogang,

Thank you for your inputs.

Yes I have already tried setting MaxBackgroundFlushes and
MaxBackgroundCompactions to higher value (tried with 2, 4, 8) , still not
getting expected results.

System.getProperty("java.io.tmpdir") points to /tmp but there I could not
find RocksDB logs, can you please let me know where can I find it ?

Regards,
Vinay Patil

On Mon, Feb 20, 2017 at 7:32 AM, xiaogang.sxg [via Apache Flink User
Mailing List archive.]  wrote:

> Hi Vinay
>
> Can you provide the LOG file in RocksDB? It helps a lot to figure out the
> problems becuse it records the options and the events happened during the
> execution. Otherwise configured, it should locate at the path set in
> System.getProperty("java.io.tmpdir").
>
> Typically, a large amount of memory is consumed by RocksDB to store
> necessary indices. To avoid the unlimited growth in the memory consumption,
> you can put these indices into block cache (set CacheIndexAndFilterBlock to
> true) and properly set the block cache size.
>
> You can also increase the number of backgroud threads to improve the
> performance of flushes and compactions (via MaxBackgroundFlushes and
> MaxBackgroudCompactions).
>
> In YARN clusters, task managers will be killed if their memory utilization
> exceeds the allocation size. Currently Flink does not count the memory used
> by RocksDB in the allocation. We are working on fine-grained resource
> allocation (see FLINK-5131). It may help to avoid such problems.
>
> May the information helps you.
>
> Regards,
> Xiaogang
>
>
> --
> 发件人:Vinay Patil <[hidden email]
> >
> 发送时间:2017年2月17日(星期五) 21:19
> 收件人:user <[hidden email]
> >
> 主 题:Re: Checkpointing with RocksDB as statebackend
>
> Hi Guys,
>
> There seems to be some issue with RocksDB memory utilization.
>
> Within few minutes of job run the physical memory usage increases by 4-5
> GB and it keeps on increasing.
> I have tried different options for Max Buffer Size(30MB, 64MB, 128MB ,
> 512MB) and Min Buffer to Merge as 2, but the physical memory keeps on
> increasing.
>
> According to RocksDB documentation, these are the main options on which
> flushing to storage is based.
>
> Can you please point me where am I doing wrong. I have tried different
> configuration options but each time the Task Manager is getting killed
> after some time :)
>
> Regards,
> Vinay Patil
>
> On Thu, Feb 16, 2017 at 6:02 PM, Vinay Patil <[hidden email]
> > wrote:
> I think its more of related to RocksDB, I am also not aware about RocksDB
> but reading the tuning guide to understand the important values that can be
> set
>
> Regards,
> Vinay Patil
>
> On Thu, Feb 16, 2017 at 5:48 PM, Stefan Richter [via Apache Flink User
> Mailing List archive.] <[hidden email]
> > wrote:
> What kind of problem are we talking about? S3 related or RocksDB related.
> I am not aware of problems with RocksDB per se. I think seeing logs for
> this would be very helpful.
>
> Am 16.02.2017 um 11:56 schrieb Aljoscha Krettek <[hidden email]
> >:
>
> [hidden email]  and 
> [hidden
> email]  could this
> be the same problem that you recently saw when working with other people?
>
> On Wed, 15 Feb 2017 at 17:23 Vinay Patil <[hidden email]
> > wrote:
> Hi Guys,
>
> Can anyone please help me with this issue
>
> Regards,
> Vinay Patil
>
> On Wed, Feb 15, 2017 at 6:17 PM, Vinay Patil <[hidden email]
> > wrote:
> Hi Ted,
>
> I have 3 boxes in my pipeline , 1st and 2nd box containing source and s3
> sink and the 3rd box is window operator followed by chained operators and a
> s3 sink
>
> So in the details link section I can see that that S3 sink is taking time
> for the acknowledgement and it is not even going to the window operator
> chain.
>
> But as shown in the snapshot ,checkpoint id 19 did not get any
> acknowledgement. Not sure what is causing the issue
>
> Regards,
> Vinay Patil
>
> On Wed, Feb 15, 2017 at 5:51 PM, Ted Yu [via Apache Flink User Mailing
> List archive.] <[hidden email]
> > wrote:
> What did the More Details link say ?
>
> Thanks
>
> > On Feb 15, 2017, at 3:11 AM, vinay patil <[hidden email]
> > wrote:
> >
> > Hi,
> >
> > I have kept the checkpointing interval to 6secs and minimum pause
> between
> > checkpoints to 5secs, while testing the pipeline I have observed that
> that
> > for some checkpoints it is taking long time , as you can see in the
> attached
> > snapshot checkpoint id 19 took 

How to achieve exactly once on node failure using Kafka

2017-02-20 Thread Y. Sakamoto

Hi,
I'm using Flink 1.2.0 and try to do "exactly once" data transfer
from Kafka to Elasticsearch, but I cannot.
(Scala 2.11, Kafka 0.10, without YARN)

There are 2 Flink TaskManager nodes, and when processing
with 2 parallelism, shutdown one of them (simulating node failure).

Using flink-connector-kafka, I wrote following code:

   StreamExecutionEnvironment env = StreamExecutionEnvironment
 .getExecutionEnvironment();
   env.enableCheckpointing(1000L);
   env.setParallelism(2);

   Properties kafkaProp = new Properties();
   kafkaProp.setProperty("bootstrap.servers", "192.168.97.42:9092");
   kafkaProp.setProperty("zookeeper.connect", "192.168.97.42:2181");
   kafkaProp.setProperty("group.id", "id");

   DataStream stream = env.addSource(new FlinkKafkaConsumer010<>(
 "topic", new SimpleStringSchema(), kafkaProp));

I found duplicated data transfer on map function.
Data from the checkpoint before node failure seems duplicated.

Is there any way to achieve "exactly once" on failure?


Thanks.
Yuichiro


Apache Flink and Elasticsearch send Json Object instead of string

2017-02-20 Thread Fábio Dias
Hi,

I'm using Flink and Elasticsearch and I want to recieve in elasticsearch a
json object ({"id":1, "name":"X"} ect...), I already have a string with
this information, but I don't want to save it as string.

I recieve this:

{
  "_index": "logs",
  "_type": "object",
  "_id": "AVpcARfkfYWqSubr0ZvK",
  "_score": 1,
  "_source": {
"data": "{\"id\":6,\"name\":\"A green
door\",\"price\":12.5,\"tags\":[\"home\",\"green\"]}"
  }
}

And I want to recieve this:

{
"_index": "logs",
"_type": "external",
"_id": "AVpcARfkfYWqSubr0ZvK",
"_score": 1,
"_source": {
"data": {
"id":6,
"name":"A green door",
"price":12.5,
"tags":
["home","green"]
}
}
}

my java code:

try {
ArrayList transports = new ArrayList<>();
transports.add(new InetSocketAddress("127.0.0.1", 9300));

ElasticsearchSinkFunction indexLog = new
ElasticsearchSinkFunction() {

private static final long serialVersionUID = 8802869701292023100L;

public IndexRequest createIndexRequest(String element){

HashMap esJson = new HashMap<>();

esJson.put("data", element);



return Requests
.indexRequest()
.index("logs")
.type("object")
.source(esJson);
}
@Override
public void process(String element, RuntimeContext ctx,
RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
};

ElasticsearchSink esSink = new
ElasticsearchSink(config, transports, indexLog);
input.addSink(esSink);
}
catch (Exception e) {
System.out.println(e);
}


Do I need to treat every entry as a map? Can I just send a object with key
value?

Thanks.


Re: Is it OK to have very many session windows?

2017-02-20 Thread Vadim Vararu

It's something like:

DataStreamSource stream = 
env.addSource(getKafkaConsumer(parameterTool)); stream
.map(getEventToDomainMapper())
.keyBy(getKeySelector())
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(90)))
.reduce(getReducer())
.map(getToJsonMapper())
.addSink(getKafkaProducer(parameterTool));


Each new event may be reduced against the existent state from the 
window, so normally it's okay to have only 1 row in memory.


I'm new to Flink and have not yet reached the "incremental aggregates" 
but, if i understand correctly, it fits my case.


Vadim.



On 20.02.2017 17:47, Timo Walther wrote:

Hi Vadim,

this of course depends on your use case. The question is how large is 
your state per pane and how much memory is available for Flink?
Are you using incremental aggregates such that only the aggregated 
value per pane has to be kept in memory?


Regards,
Timo


Am 20/02/17 um 16:34 schrieb Vadim Vararu:

HI guys,

Is it okay to have very many (tens of thousands or hundreds of 
thousand) of session windows?



Thanks, Vadim.







Re: Is it OK to have very many session windows?

2017-02-20 Thread Timo Walther

Hi Vadim,

this of course depends on your use case. The question is how large is 
your state per pane and how much memory is available for Flink?
Are you using incremental aggregates such that only the aggregated value 
per pane has to be kept in memory?


Regards,
Timo


Am 20/02/17 um 16:34 schrieb Vadim Vararu:

HI guys,

Is it okay to have very many (tens of thousands or hundreds of 
thousand) of session windows?



Thanks, Vadim.





Is it OK to have very many session windows?

2017-02-20 Thread Vadim Vararu

HI guys,

Is it okay to have very many (tens of thousands or hundreds of thousand) 
of session windows?



Thanks, Vadim.



Re: Is a new window created for each key/group?

2017-02-20 Thread Sonex
Yes, you are correct. A window will be created for each key/group and then
you can apply a function, or aggregate elements per key.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Is-a-new-window-created-for-each-key-group-tp11745p11746.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: blob store defaults to /tmp and files get deleted

2017-02-20 Thread Ufuk Celebi
Hey Shannon,

good idea! We currently have this:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/ops/production_ready.html

It has a strong focus on managed state and not the points you mentioned.

Would you like to create an issue for adding this to the production
check list? I think it's valuable feedback.

– Ufuk


On Fri, Feb 17, 2017 at 7:24 PM, Shannon Carey  wrote:
> A few of my jobs recently failed and showed this exception:
>
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load
> user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
> ClassLoader info: URL ClassLoader:
> file:
> '/tmp/blobStore-5f023409-6af5-4de6-8ed0-e80a2eb9633e/cache/blob_d9a9fb884f3b436030afcf7b8e1bce678acceaf2'
> (invalid JAR: zip file is empty)
> Class not resolvable through given classloader.
>   at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:208)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
>   at java.lang.Thread.run(Thread.java:745)
>
>
> As you can see, Flink is storing things underneath /tmp, which is the
> (undocumented) default for the blob store. As you may know, on Linux,
> there's typically a program such as tmpwatch which is run periodically to
> clear out data from /tmp.
>
>
> Flink also uses /tmp as the default for jobmanager.web.tmpdir (and
> jobmanager.web.upload.dir in 1.2).
>
>
> Therefore, assuming that this is indeed the cause of the job failure/the
> exception, it seems highly advisable that when you run a Flink cluster you
> configure blob.storage.directory and jobmanager.web.tmpdir to a specific
> folder that is not beneath /tmp. I don't know if there is any material about
> setting up a production cluster, but this would definitely seem to be a
> necessary configuration to specify if you want to avoid problems. Enabling
> High Availability mode should also be on that list, I think.
>
>
> -Shannon


Re: Previously working job fails on Flink 1.2.0

2017-02-20 Thread Stefan Richter
Hi,

Flink 1.2 is partitioning all keys into key-groups, the atomic units for 
rescaling. This partitioning is done by hash partitioning and is also in sync 
with the routing of tuples to operator instances (each parallel instance of a 
keyed operator is responsible for some range of key groups). This exception 
means that Flink detected a tuple in the state backend of a parallel operator 
instance that should not be there because, by its key hash, it belongs to a 
different key-group. Or phrased differently, this tuple belongs to a different 
parallel operator instance. If this is a Flink bug or user code bug is very 
hard to tell, the log also does not provide additional insights. I could see 
this happen in case that your keys are mutable and your code makes some changes 
to the object that change the hash code. Another question is also: did you 
migrate your job from Flink 1.1.3 through an old savepoint or did you do a 
fresh start. Other than that, I can recommend to check your code for mutating 
of keys. If this fails deterministically, you could also try to set a 
breakpoint for the line of the exception and take a look if the key that is 
about to be inserted is somehow special.

Best,
Stefan 


> Am 20.02.2017 um 14:32 schrieb Steffen Hausmann :
> 
> Hi there,
> 
> I’m having problems running a job on Flink 1.2.0 that successfully executes 
> on Flink 1.1.3. The job is supposed to read events from a Kinesis stream and 
> to send outputs to Elasticsearch and it actually initiates successfully on a 
> Flink 1.2.0 cluster running on YARN, but as soon as I start to ingest events 
> into the Kinesis stream, the job fails (see the attachment for more 
> information):
> 
> java.lang.RuntimeException: Unexpected key group index. This indicates a bug.
> 
> at org.apache.flink.runtime.state.heap.StateTable.set(StateTable.java:57)
> 
> at 
> org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:98)
> 
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:372)
> 
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)
> 
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
> 
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)
> 
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> 
> at java.lang.Thread.run(Thread.java:745)
> 
> Any ideas what’s going wrong here? The job executes successfully when it’s 
> compiled against the Flink 1.1.3 artifacts and run on a Flink 1.1.3 cluster. 
> Does this indicate a bug in my code or is this rather a bug in Flink? How can 
> I further debug this?
> 
> Any guidance is highly appreciated.
> 
> Thanks,
> 
> Steffen
> 
> 



回复:Transfer information from one window to the next

2017-02-20 Thread 施晓罡(星罡)
Hi sonex
I think you can accomplish it by using a PassThroughFunction as the apply 
function and processing the elements in a rich flatMap function followed.  You 
can keep the information in the flatmap function (via states) so that they can 
be shared among different windows.
The program may look like
stream.keyBy(...).timeWindow(...)    .apply(new WindowFunction() {        
public void apply(K key, W window, Iterable elements, Collector out) { 
           out.collect(new Tuple3<>(key, window, elements);    })    .keyBy(0)  
  // use the same key as the windows    .flatMap(...) // process the windows 
with shared information
Regards,Xiaogang

--发件人:Sonex 
发送时间:2017年2月20日(星期一) 16:32收件人:user 
主 题:Transfer information from one window to the next
val stream =
inputStream.assignAscendingTimestamps(_.eventTime).keyBy(_.inputKey).timeWindow(Time.seconds(3600),Time.seconds(3600))

stream.apply{...}

Given the above example I want to transfer information (variables and
values) from the current apply function to the apply function of the next
window (of course with the same key).

How can I do that?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Transfer-information-from-one-window-to-the-next-tp11737.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Jobmanager was killed when disk less 10% in yarn

2017-02-20 Thread lining jing
I have seen the log, did not find any information. Just get some
information about the machine run this node. Disk less 10%

2017-02-20 14:03 GMT+08:00 wangzhijiang999 :

> The log just indicates the SignalHandler handles the kill signal and the
> process of JobManager exit , and it can not get the reason from it.
> You may check the container log from node manager why it was killed.
>
> Best,
>
> Zhijiang
>
> --
> 发件人:lining jing 
> 发送时间:2017年2月20日(星期一) 10:13
> 收件人:user 
> 主 题:Jobmanager was killed when disk less 10% in yarn
>
> Hi,
>
> I use yarn manager resource. Recently when disk less 10% , JobManager was
> killed. I want to know whether the reason is the disk problem.
>
>
> log :
>
>
> 2017-02-19 03:20:37,087 INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner
> - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
> 2017-02-19 03:20:37,088 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Stopping checkpoint coordinator for job 1b45608e30808183913eeffbb4d855
> da
> 2017-02-19 03:20:37,088 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Stopping checkpoint coordinator for job 1b45608e30808183913eeffbb4d855
> da
> 2017-02-19 03:20:37,089 INFO  org.apache.flink.runtime.blob.BlobCache
>   - Shutting down BlobCache
> 2017-02-19 03:20:37,089 INFO  
> org.apache.flink.runtime.webmonitor.WebRuntimeMonitor
> - Removing web dashboard root cache directory
> /tmp/flink-web-dfa2b369-44ea-4e35-8011-672a1e627a10
> 2017-02-19 03:20:37,089 INFO  org.apache.flink.runtime.blob.BlobCache
>   - Shutting down BlobCache
> 2017-02-19 03:20:37,137 INFO  
> org.apache.flink.runtime.webmonitor.WebRuntimeMonitor
> - Removing web dashboard jar upload directory /tmp/flink-web-upload-
> d6edb5ea-5894-489b-89f7-f2972fc9433d
> 2017-02-19 03:20:37,138 INFO  org.apache.flink.runtime.blob.BlobServer
>- Stopped BLOB server at 0.0.0.0:54513
>
>
>
>