Multiple output operations in a job vs multiple jobs

2018-07-31 Thread anna stax
Hi all,

I am not sure when I should go for multiple jobs or have 1 job with all the
sources and sinks. Following is my code.

   val env = StreamExecutionEnvironment.getExecutionEnvironment
...
// create a Kafka source
val srcstream = env.addSource(consumer)

srcstream
  .keyBy(0)
  .window(ProcessingTimeSessionWindows.withGap(Time.days(14)))
  .reduce  ...
  .map ...
  .addSink ...

srcstream
  .keyBy(0)
  .window(ProcessingTimeSessionWindows.withGap(Time.days(28)))
  .reduce  ...
  .map ...
  .addSink ...

env.execute("Job1")

My questions

1. The srcstream is a very high volume stream and the window size is 2
weeks and 4 weeks. Is the window size a problem? In this case, I think it
is not a problem because I am using reduce which stores only 1 value per
window. Is that right?

2. I am having 2 output operations one with 2 weeks window and the other
with 4 weeks window. Are they executed in parallel or in sequence?

3. When I have multiple output operations like in this case should I break
it into 2 different jobs ?

4. Can I run multiple jobs on the same cluster?

Thanks


Re: scala IT

2018-07-31 Thread Nicos Maris
Isn't the returns functions deprecated?

On Tue, Jul 31, 2018, 5:32 AM vino yang  wrote:

> Hi Nicos,
>
> The thrown exception has given you a clear solution hint:
> The return type of function 'apply(Mu
>ltiplyByTwoTest.scala:43)' could not be determined automatically, due
> to type erasure. You can giv
>e type information hints by using the returns(...) method on the result
> of the transformation call
>, or by letting your function implement the 'ResultTypeQueryable'
> interface.
>
> You can consider the second option. More information on the Flink type
> system can be found in the official documentation[1].
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/types_serialization.html#type-information-in-the-scala-api
>
> Thanks, vino.
>
> 2018-07-31 6:30 GMT+08:00 Nicos Maris :
>
>> Hi all,
>>
>>
>> the integration test in scala documented at the testing section fails:
>>
>> https://travis-ci.org/nicosmaris/HelloDockerScalaSbt/builds/410075764
>>
>> In previous commits of my demo repo, I tried typextractor , basictypeinfo
>> and resultTypeQuerable with no success. I am new to flink and to Scala and
>> I would like to have both, not flink with Java.
>>
>> Am I doing something that is fundamentally wrong?
>>
>>
>> thanks,
>> Nicos Maris
>>
>
>


Re: scala IT

2018-07-31 Thread vino yang
Hi Nicos,

The returns API is not deprecated, just because it is part of the
DataStream Java API.

Thanks, vino.

2018-07-31 15:15 GMT+08:00 Nicos Maris :

> Isn't the returns functions deprecated?
>
> On Tue, Jul 31, 2018, 5:32 AM vino yang  wrote:
>
>> Hi Nicos,
>>
>> The thrown exception has given you a clear solution hint:
>> The return type of function 'apply(Mu
>>ltiplyByTwoTest.scala:43)' could not be determined automatically, due
>> to type erasure. You can giv
>>e type information hints by using the returns(...) method on the
>> result of the transformation call
>>, or by letting your function implement the 'ResultTypeQueryable'
>> interface.
>>
>> You can consider the second option. More information on the Flink type
>> system can be found in the official documentation[1].
>>
>> [1]: https://ci.apache.org/projects/flink/flink-docs-
>> release-1.5/dev/types_serialization.html#type-
>> information-in-the-scala-api
>>
>> Thanks, vino.
>>
>> 2018-07-31 6:30 GMT+08:00 Nicos Maris :
>>
>>> Hi all,
>>>
>>>
>>> the integration test in scala documented at the testing section fails:
>>>
>>> https://travis-ci.org/nicosmaris/HelloDockerScalaSbt/builds/410075764
>>>
>>> In previous commits of my demo repo, I tried typextractor ,
>>> basictypeinfo and resultTypeQuerable with no success. I am new to flink and
>>> to Scala and I would like to have both, not flink with Java.
>>>
>>> Am I doing something that is fundamentally wrong?
>>>
>>>
>>> thanks,
>>> Nicos Maris
>>>
>>
>>


Re: Multiple output operations in a job vs multiple jobs

2018-07-31 Thread vino yang
Hi anna,

1. The srcstream is a very high volume stream and the window size is 2
weeks and 4 weeks. Is the window size a problem? In this case, I think it
is not a problem because I am using reduce which stores only 1 value per
window. Is that right?

*>> Window Size is based on your business needs settings. However, if the
window size is too large, the status of the job will be large, which will
result in a longer recovery failure. You need to be aware of this. One
value per window is just a value calculated by the window. It caches all
data for the period of time before the window is triggered.*

2. I am having 2 output operations one with 2 weeks window and the other
with 4 weeks window. Are they executed in parallel or in sequence?

*>> These two windows are calculated in parallel.*

3. When I have multiple output operations like in this case should I break
it into 2 different jobs ?

*>> Both modes are ok. When there is only one job, the two windows will
share the source stream, but this will result in a larger state of the job
and a slower recovery. When split into two jobs, there will be two
consumptions of kafka, but the two windows are independent in both jobs.*

4. Can I run multiple jobs on the same cluster?


*>> For Standalone cluster mode or Yarn Flink Session mode, etc., there is
no problem. For Flink on yarn single job mode, a cluster can usually only
run one job, which is the recommended mode.*

Thanks, vino.

2018-07-31 15:11 GMT+08:00 anna stax :

> Hi all,
>
> I am not sure when I should go for multiple jobs or have 1 job with all
> the sources and sinks. Following is my code.
>
>val env = StreamExecutionEnvironment.getExecutionEnvironment
> ...
> // create a Kafka source
> val srcstream = env.addSource(consumer)
>
> srcstream
>   .keyBy(0)
>   .window(ProcessingTimeSessionWindows.withGap(Time.days(14)))
>   .reduce  ...
>   .map ...
>   .addSink ...
>
> srcstream
>   .keyBy(0)
>   .window(ProcessingTimeSessionWindows.withGap(Time.days(28)))
>   .reduce  ...
>   .map ...
>   .addSink ...
>
> env.execute("Job1")
>
> My questions
>
> 1. The srcstream is a very high volume stream and the window size is 2
> weeks and 4 weeks. Is the window size a problem? In this case, I think it
> is not a problem because I am using reduce which stores only 1 value per
> window. Is that right?
>
> 2. I am having 2 output operations one with 2 weeks window and the other
> with 4 weeks window. Are they executed in parallel or in sequence?
>
> 3. When I have multiple output operations like in this case should I break
> it into 2 different jobs ?
>
> 4. Can I run multiple jobs on the same cluster?
>
> Thanks
>
>
>


Re: 答复: Best way to find the current alive jobmanager with HA mode zookeeper

2018-07-31 Thread Till Rohrmann
I think that the web ui automatically redirects to the current leader. So
if you should access the JobManager which is not leader, then you should
get an HTTP redirect to the current leader. Due to that it should not be
strictly necessary to know which of the JobManagers is the leader.

The RestClusterClient uses the ZooKeeperLeaderRetrievalService to retrieve
the leader address. You could try the same. Using the RestClusterClient
with Flink 1.4 won't work, though. Alternatively, you should be able to
directly read the address from the leader ZNode in ZooKeeper.

Cheers,
Till



On Thu, Jul 26, 2018 at 4:14 AM vino yang  wrote:

> Hi Youjun,
>
> Thanks, you can try this but I am not sure if it works correctly. Because
> for the REST Client, there are quite a few changes from 1.4 to 1.5.
>
> Maybe you can customize the source code in 1.4 refer to specific
> implementation of 1.5? Another option, upgrade your Flink version.
>
> To Chesnay and Till:  any suggestion or opinion?
>
> Thanks, vino.
>
> 2018-07-26 10:01 GMT+08:00 Yuan,Youjun :
>
>> Thanks for the information. Forgot to mention, I am using Flink 1.4, the
>> RestClusterClient seems don’t have the ability to retrieve the leader
>> address. I did notice there is webMonitorRetrievalService member in Flink
>> 1.5.
>>
>>
>>
>> I wonder if I can use RestClusterClient@v1.5 on my client side, to
>> retrieve the leader JM of Flink v1.4 Cluster.
>>
>>
>>
>> Thanks
>>
>> Youjun
>>
>>
>>
>> *发件人**:* vino yang 
>> *发送时间:* Wednesday, July 25, 2018 7:11 PM
>> *收件人:* Martin Eden 
>> *抄送:* Yuan,Youjun ; user@flink.apache.org
>> *主题:* Re: Best way to find the current alive jobmanager with HA mode
>> zookeeper
>>
>>
>>
>> Hi Martin,
>>
>>
>>
>>
>>
>> For a standalone cluster which exists multiple JM instances, If you do
>> not use Rest API, but use Flink provided Cluster client. The client can
>> perceive which one this the JM leader from multiple JM instances.
>>
>>
>>
>> For example, you can use CLI to submit flink job in a non-Leader node.
>>
>>
>>
>> But I did not verify this case for Flink on Mesos.
>>
>>
>>
>> Thanks, vino.
>>
>>
>>
>> 2018-07-25 17:22 GMT+08:00 Martin Eden :
>>
>> Hi,
>>
>>
>>
>> This is actually very relevant to us as well.
>>
>>
>>
>> We want to deploy Flink 1.3.2 on a 3 node DCOS cluster. In the case of
>> Mesos/DCOS, Flink HA runs only one JobManager which gets restarted on
>> another node by Marathon in case of failure and re-load it's state from
>> Zookeeper.
>>
>>
>>
>> Yuan I am guessing you are using Flink in standalone mode and there it is
>> actually running 3 instances of the Job Manager, 1 active and 2 stand-bys.
>>
>>
>>
>> Either way, in both cases there is the need to "discover" the hostname
>> and port of the Job Manager at runtime. This is needed when you want to use
>> the cli to submit jobs for instance. Is there an elegant mode to submit
>> jobs other than say just trying out all the possible nodes in your cluster?
>>
>>
>>
>> Grateful if anyone could clarify any of the above, thanks,
>>
>> M
>>
>>
>>
>> On Wed, Jul 25, 2018 at 11:37 AM, Yuan,Youjun 
>> wrote:
>>
>> Hi all,
>>
>>
>>
>> I have a standalone cluster with 3 jobmanagers, and set *high-availability
>> to zookeeper*. Our client submits job by REST API(POST
>> /jars/:jarid/run), which means we need to know the host of the any of the
>> current alive jobmanagers. The problem is that, how can we know which job
>> manager is alive, or the host of current leader?  We don’t want to access a
>> dead JM.
>>
>>
>>
>> Thanks.
>>
>> Youjun Yuan
>>
>>
>>
>>
>>
>
>


Re: Questions on Unbounded number of keys

2018-07-31 Thread Till Rohrmann
Hi Ashish,

the processing time session windows need to store state in the
StateBackends and I assume that your key space of active windows is
constantly growing. That could explain why you are seeing an ever
increasing memory footprint. But without knowing the input stream and what
the UDFs do this is only a guess.

Cheers,
Till

On Mon, Jul 30, 2018 at 1:43 PM Fabian Hueske  wrote:

> Hi Chang,
>
> The state handle objects are not created per key but just once per
> function instance.
> Instead they route state accesses to the backend (JVM heap or RocksDB) for
> the currently active key.
>
> Best, Fabian
>
> 2018-07-30 12:19 GMT+02:00 Chang Liu :
>
>> Hi Andrey,
>>
>> Thanks for your reply. My question might be silly, but there is still one
>> part I would like to fully understand. For example, in the following
>> example:
>>
>> class MyFunction extends KeyedProcessFunction[String, Click, Click] { // 
>> keyed by Session ID
>>   lazy val userId: ValueState[String] = getRuntimeContext.getState(
>> new ValueStateDescriptor[String]("userId", 
>> BasicTypeInfo.STRING_TYPE_INFO))
>>
>>   lazy val clicks: ListState[Click] = getRuntimeContext.getListState(
>> new ListStateDescriptor[Click]("clicks", createTypeInformation[Click]))
>>
>>   override def processElement(
>>   click: Click,
>>   context: KeyedProcessFunction[String, Click, Click]#Context,
>>   out: Collector[Click])
>>   : Unit = {
>> // process, output, clear state if necessary
>>   }
>>
>>   override def onTimer(
>>   timestamp: Long,
>>   ctx: KeyedProcessFunction[String, Click, Click]#OnTimerContext,
>>   out: Collector[Click])
>>   : Unit = {
>> // output and clear state
>>   }
>> }
>>
>>
>> Even though I am regularly clearing the two states, userId and clicks
>> (which means I am cleaning up the values stored in the States), my question
>> is: then what about the two State objects themselves: userId and clicks?
>> These States objects are also created per Session ID right? If the number
>> of Session IDs are unbounded, than the number of these State objects are
>> also unbounded.
>>
>> That means, there are *userId-state-1 and clicks-state-1 for
>> session-id-1*, *userId-state-2 and clicks-state-2 for session-id-2*, 
>> *userId-state-3
>> and clicks-state-3 for session-id-3*, …, which are handled by different
>> (or same if two from different *range*, as you call it, are assigned to
>> the same one) keyed operator instance.
>>
>> I am not concerning the actual value in the State (which will be managed
>> carefully, if I am clearing them carefully). I am thinking about the State
>> objects themselves, which I have no idea what is happening to them and what
>> will happen to them.
>>
>> Many thanks :)
>>
>> Best regards/祝好,
>>
>> Chang Liu 刘畅
>>
>>
>> On 26 Jul 2018, at 10:55, Andrey Zagrebin 
>> wrote:
>>
>> Hi Chang Liu,
>>
>> The unbounded nature of the stream keyed or not should not lead to out of
>> memory.
>>
>> Flink parallel keyed operator instances have fixed number (parallelism)
>> and just process some range of keyed elements, in your example it is a
>> subrange of session ids.
>>
>> The keyed processed elements (http requests) are objects created when
>> they enter the pipeline and garage collected after having been processed in
>> streaming fashion.
>>
>> If they arrive very rapidly it can lead to high back pressure from
>> upstream to downstream operators, buffers can become full and pipeline
>> stops/slows down processing external inputs, it usually means that your
>> pipeline is under provisioned.
>>
>> The only accumulated data comes from state (windows, user state etc), so
>> if you control its memory consumption, as Till described, there should be
>> no other source of out of memory.
>>
>> Cheers,
>> Andrey
>>
>> On 25 Jul 2018, at 19:06, Chang Liu  wrote:
>>
>> Hi Till,
>>
>> Thanks for your reply. But I think maybe I did not make my question
>> clear. My question is not about whether the States within each keyed
>> operator instances will run out of memory. My question is about, whether
>> the unlimited keyed operator instances themselves will run out of memory.
>>
>> So to reply to your answers, no matter using different State backends or
>> regularly cleaning up the States (which is exactly what I am doing), it
>> does not concern the number of keyed operator instances.
>>
>> I would like to know:
>>
>>- Will the number of keyed operator instances (Java objects?) grow
>>unbounded?
>>- If so, will they run out of memory? This is not actually related to
>>the memory used by the keyed Stated inside.
>>- If not, then how Flink is managing this multiple keyed operator
>>instances?
>>
>>
>> I think this needs more knowledge about how Flink works internally to
>> understand how keyed operator instances are created, maintained and
>> destroyed. That’s why I would like your help understanding this.
>>
>> Many Thanks.
>>
>> Best regards/祝好,
>>
>> Chang Liu 刘畅
>>
>>
>> On

Re: watermark VS window trigger

2018-07-31 Thread Fabian Hueske
Hi,

Watermarks are not holding back records. Instead they define the event-time
at an operator (as Vino said) and can trigger the processing of data if the
logic of an operator is based on time.
For example, a window operator can emit complete results for a window once
the time passed the window's end timestamp.
Operators that do not act on time, such as mappers or filters, emit records
at as soon as possible without waiting for watermarks.

Best, Fabian

2018-07-30 11:37 GMT+02:00 vino yang :

> Hi Soheil,
>
> I feel that some of your understanding is a bit problematic.
>
> *"After that according to the current watermark, data with the timestamp
> between the last watermark and current watermark will be released and go to
> the next steps"*
>
> The main role of Watermark here is to define the progress of the event
> time, which will serve as the time base for the window to trigger. Before
> the time window, the upstream will only generate a Watermark according to a
> specific cycle, and then raise the Watermark of the downstream task while
> flowing downstream.
>
> You can read "event time & watermark" documentation [1].
>
> [1]: https://ci.apache.org/projects/flink/flink-docs-
> master/dev/event_time.html
>
> Thanks, vino.
>
>
> 2018-07-30 17:05 GMT+08:00 Soheil Pourbafrani :
>
>> Suppose we have a time window of 10 milliseconds and we use EventTime.
>> First, we determine how Flink can get time and watermark from
>> incoming messages, after that, we set a key for the stream and set a time
>> window.
>>
>> aggregatedTuple
>> .assignTimestampsAndWatermarks(new 
>> SampleTimestampExtractor())
>> 
>> .keyBy(1).timeWindow(Time.milliseconds(1000))/*.countWindow(3)*/
>> .reduce()
>>
>> My understanding of the data flow in this scenario is the following:
>>
>> Flink advanced time according to the timestamp of Incoming data into the
>> aggregatedTuple variable while for each message get the timestamp and
>> watermark.
>> As I use Periodic Watermarks, according to default watermark interval
>> (200ms), watermarks will be updated. After that according to the
>> current watermark, data with the timestamp between the last watermark and
>> current watermark will be released and go to the next steps (keyBy,
>> timeWindow, reduce). If Flink received a data but an appropriate watermark
>> didn't emit for that data yet, Flink didn't send that data to the next
>> steps and keep it until it's appropriate watermark will be emitted.
>>
>> Is that correct?
>>
>
>


Re: Logs are not easy to read through webUI

2018-07-31 Thread Till Rohrmann
Hi Xinyu,

thanks for starting this discussion. I think you should open a JIRA issue
for this feature. I can see the benefit of such a feature if the
DailyRollingAppender is activated.

Cheers,
Till

On Mon, Jul 30, 2018 at 1:47 PM vino yang  wrote:

> Hi Xinyu,
>
> Thanks for your suggestion. I will CC this suggestion to some PMC members
> of Flink.
>
> Thanks, vino.
>
> 2018-07-30 18:03 GMT+08:00 Xinyu Zhang :
>
>> Hi vino
>>
>> Yes, it's only from the perspective of performance of reading log or
>> metrics.  If the logs with timestamps(e.g. jobmanager.log.2018-07-29) will
>> never change, maybe blob store can cache some of them to improve
>> performance.
>>
>> BTW, please considering to develop an API for reading logs. I think many
>> flink users meet this problem.
>>
>> Thanks!
>>
>> Xinyu Zhang
>>
>>
>> 2018年7月30日星期一,vino yang  写道:
>>
>>> Hi Xinyu,
>>>
>>> Thank you for your clarification on "periodic reading". If Flink
>>> considers developing an API for reading logs, I think this is a good idea.
>>>
>>> Regarding the problem of TM reading logs, your idea is good from a
>>> performance perspective.
>>> But Flink didn't provide any web services for the TM from the beginning.
>>> All the requests were passed through the JM proxy.
>>> Just because of the log read performance changes, there will be major
>>> changes to the architecture, and this will make the TM take on more
>>> responsibilities.
>>>
>>> Thanks, vino.
>>>
>>>
>>> 2018-07-30 17:34 GMT+08:00 Xinyu Zhang :
>>>
 Thanks for your reply.  "periodic reading" means reading all logs in a
 given time interval. For example, my logs is daily divided, I can get all
 logs of yesterday through a parameter like '2018-07-29/2018-07-30'.

 TM which provides a web service to display information will lessen the
 burden of jobmanager, especially when there are many taskManagers in the
 flink cluster.


 2018年7月30日星期一,vino yang  写道:

> Hi Xinyu,
>
> This is indeed a problem. Especially when the amount of logs is large,
> it may even cause the UI to stall for a long time. The same is true for
> YRAN, and there is really no good way to do it at the moment.
> Thank you for your suggestion, do you mean "periodic reading" refers
> to full or incremental? If it is a full reading, I personally do not
> recommend this, which will increase the burden on JM and client, and it is
> appropriate to manually trigger it by the user. Maybe we can consider
> loading only a few logs at a time, such as incremental reads, paged
> displays, and so on.
> Regarding the second question, Flink did this because its TM does not
> provide any web services to display information, and the Web UI is
> currently bundled with JM.
>
> Thanks, vino.
>
> 2018-07-30 16:33 GMT+08:00 Xinyu Zhang :
>
>> Hi all
>>
>> We use flink on yarn and flink version is 1.4.
>>
>> When a streaming job run for a long time, the webUI cannot show logs.
>> This may be becasue the log size is too large.
>>
>> However, if we use the DailyRollingAppender to divide logs
>> (granularity is `day`) in log4j.properties, we will never see the log of
>> yesterday.
>>
>> Is there any ideas can make read logs easier?
>>
>> Maybe, we should add an interface that support for reading log by
>> time interval. Besides, when we get the taskmanager logs through webUI,
>> jobmanager can redirect to a URL of the taskmanager, which users can get
>> the logs directly (Just like MR task), other than downloading the logs 
>> from
>> taskmanager and then sending logs to users.
>>
>> Thanks!
>>
>> Xinyu Zhang
>>
>>
>>
>
>>>
>


[ANNOUNCE] Apache Flink 1.5.2 released

2018-07-31 Thread Chesnay Schepler
|The Apache Flink community is very happy to announce the release of 
Apache Flink 1.5.2, which is the second bugfix release for the Apache 
Flink 1.5 series.


Apache Flink® is an open-source stream processing framework for 
distributed, high-performing, always-available, and accurate data 
streaming applications.


The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the 
improvements for this bugfix release:

https://flink.apache.org/news/2018/07/31/release-1.5.2.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12343588

We would like to thank all contributors of the Apache Flink community 
who made this release possible!


Regards,
Chesnay|


Re: Small-files source - partitioning based on prefix of file

2018-07-31 Thread Fabian Hueske
Hi Averell,

The records emitted by the monitoring tasks are "just" file splits, i.e.,
meta information that defines which data to read from where.
The reader tasks receive these splits and process them by reading the
corresponding files.

You could of course partition the splits based on the file name (or
whatever attribute) however, this is not the only thing you need to change
if you want to have a fault tolerant setup.
A reader task stores the splits that it hasn't processed yet in operator
state which is randomly redistributed when the operator recovers from a
failure (or when rescaling the appliation)
You would need to change the logic of the reader task as well to ensure
that the splits are deterministically assigned to reader tasks.

TBH, I would just add a keyBy() after the source. Since, the monitoring
sink just emits meta data, the data won't be shuffled twice.

Best, Fabian

2018-07-31 6:54 GMT+02:00 vino yang :

> Hi Averell,
>
> Actually, Performing a key partition inside the Source Function is the
> same as DataStream[Source].keyBy(cumstom partitioner), because keyBy is
> not a real operator, but a virtual node in a DAG, which does not correspond
> to a physical operator.
>
> Thanks, vino.
>
> 2018-07-31 10:52 GMT+08:00 Averell :
>
>> Hi Vino,
>>
>> I'm a little bit confused.
>> If I can do the partitioning from within the source function, using the
>> same
>> hash function on the key to identify the partition, would that be
>> sufficient
>> to avoid shuffling in the next byKey call?
>>
>> Thanks.
>> Averell
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>


Re: Event time didn't advance because of some idle slots

2018-07-31 Thread Fabian Hueske
Hi,

If you are using a custom source, you can call
SourceContext.markAsTemporarilyIdle() to indicate that a task is currently
not producing new records [1].

Best, Fabian

2018-07-31 8:50 GMT+02:00 Reza Sameei :

> It's not a real solution; but why you don't change the parallelism for
> your `SourceFunction`?
>
> On Tue, Jul 31, 2018 at 10:51 AM Soheil Pourbafrani 
> wrote:
>
>> In Flink Event time mode, I use the periodic watermark to advance event
>> time. Every slot extract event time from the incoming message and to emit
>> watermark, subtract it a network delay, say 3000ms.
>>
>> public Watermark getCurrentWatermark() {
>> return new Watermark(MAX_TIMESTAMP - DELEY);
>> }
>>
>> I have 4 active slots. The problem is just two slots get incoming data
>> but all of them call the method getCurrentWatermark(). So in this
>> situation consider a case that thread 1 and 2 get incoming data and thread
>> 3 and 4 will not.
>>
>> Thread-1-watermark ---> 1541217659806
>> Thread-2-watermark ---> 1541217659810
>> Thread-3-watermark ---> (0 - 3000 = -3000)
>> Thread-4-watermark ---> (0 - 3000 = -3000)
>>
>> So as Flink set the lowest watermark as the general watermark, time
>> doesn't go on! If I change the getCurrentWatermark() method as:
>>
>> public Watermark getCurrentWatermark() {
>> return new Watermark(System.currentTimeMillis() - DELEY);
>> }
>>
>> it will solve the problem, but I don't want to use machine's timestamp!
>> How can I fix the problem?
>>
>>
>
> --
> رضا سامعی  | Reza Sameei | Software Developer | 09126662695
>


Re: Event time didn't advance because of some idle slots

2018-07-31 Thread Hequn Cheng
Hi Soheil,

You can set parallelism to 1 to solve the problem.
Or use markAsTemporarilyIdle() as Fabian said(the link maybe is
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
line639).

On Tue, Jul 31, 2018 at 4:51 PM, Fabian Hueske  wrote:

> Hi,
>
> If you are using a custom source, you can call SourceContext.
> markAsTemporarilyIdle() to indicate that a task is currently not
> producing new records [1].
>
> Best, Fabian
>
> 2018-07-31 8:50 GMT+02:00 Reza Sameei :
>
>> It's not a real solution; but why you don't change the parallelism for
>> your `SourceFunction`?
>>
>> On Tue, Jul 31, 2018 at 10:51 AM Soheil Pourbafrani <
>> soheil.i...@gmail.com> wrote:
>>
>>> In Flink Event time mode, I use the periodic watermark to advance event
>>> time. Every slot extract event time from the incoming message and to emit
>>> watermark, subtract it a network delay, say 3000ms.
>>>
>>> public Watermark getCurrentWatermark() {
>>> return new Watermark(MAX_TIMESTAMP - DELEY);
>>> }
>>>
>>> I have 4 active slots. The problem is just two slots get incoming data
>>> but all of them call the method getCurrentWatermark(). So in this
>>> situation consider a case that thread 1 and 2 get incoming data and thread
>>> 3 and 4 will not.
>>>
>>> Thread-1-watermark ---> 1541217659806
>>> Thread-2-watermark ---> 1541217659810
>>> Thread-3-watermark ---> (0 - 3000 = -3000)
>>> Thread-4-watermark ---> (0 - 3000 = -3000)
>>>
>>> So as Flink set the lowest watermark as the general watermark, time
>>> doesn't go on! If I change the getCurrentWatermark() method as:
>>>
>>> public Watermark getCurrentWatermark() {
>>> return new Watermark(System.currentTimeMillis() - DELEY);
>>> }
>>>
>>> it will solve the problem, but I don't want to use machine's timestamp!
>>> How can I fix the problem?
>>>
>>>
>>
>> --
>> رضا سامعی  | Reza Sameei | Software Developer | 09126662695
>>
>
>


Process with guava cache

2018-07-31 Thread Juan Gentile
Hello!

I’m trying to have a process with a cache (using guava) and following this
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/process_function.html

But when I run it I get the following exception:


com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException

Serialization trace:

localCache 
(com.criteo.internal.shaded.com.google.common.cache.LocalCache$LocalManualCache)

at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)

at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)

at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)

at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:272)

at 
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:83)

at com.criteo.flink.CachedOutput.processElement(CachedOutput.scala:28)

at com.criteo.flink.CachedOutput.processElement(CachedOutput.scala:16)

at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:94)

at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)

at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.NullPointerException

at 
com.criteo.internal.shaded.com.google.common.cache.LocalCache.hash(LocalCache.java:1839)

at 
com.criteo.internal.shaded.com.google.common.cache.LocalCache.put(LocalCache.java:4148)

at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:144)

at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)

at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)

at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)

Which I guess it’s a problem trying to serialize the state to update it?
Do you know how I could solve this?

Btw I’m trying to not output records already processed while using a sliding 
window.

Thanks!
Juan


scala cep with event time

2018-07-31 Thread 孙森
Hi,Fabian

  I am using flink CEP library with event time, but there is no output( the 
java code performed as expected, but scala did not) .My code is here:


object EventTimeTest extends App {
  val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.createLocalEnvironment()
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  val input: DataStream[Event] = env.fromElements(new Event(1, "aa", 
DateUtils.dt2timestamp("2018-05-14 10:29:15.00")),
new Event(1, "ab", DateUtils.dt2timestamp("2018-05-14 10:29:25.00")),
new Event(3, "ac", DateUtils.dt2timestamp("2018-05-14 10:29:35.00")),
new Event(4, "ad", DateUtils.dt2timestamp("2018-05-14 10:29:45.00")),
new Event(5, "ae", DateUtils.dt2timestamp("2018-05-14 10:29:55.00")))


  input.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(100)) {
  override def extractTimestamp(element: Event): Long = {
val ts = DateUtils.dt2long(element.getTs)
println(ts)
ts
  }
}).setParallelism(1)

  val partitionedInput: KeyedStream[Event, Long] = input.keyBy(event => 
event.getId)

  val pattern: Pattern[Event, Event] = Pattern.begin("start")
.subtype(classOf[Event])
.where(_.getName.startsWith("a")).within(Time.seconds(30))

  val patternStream = CEP.pattern(partitionedInput, pattern)

  val alerts = patternStream.select(patternSelectFun => {
val startEvent: Event = patternSelectFun("start").head
println(startEvent.getName)
startEvent.getName
  })


  alerts.print()

  env.execute("start")

}



The java code is :



public class EventTest {

public static void main(String[] s) {

LocalStreamEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

SimpleDateFormat formatter = new SimpleDateFormat("-MM-dd 
HH:mm:ss");

DataStream input = env.fromElements(
new Event(1, "aa", "2018-05-14 10:29:15"),
new Event(1, "ab", "2018-05-14 10:29:25"),
new Event(3, "ac", "2018-05-14 10:29:35"),
new Event(4, "ad", "2018-05-14 10:29:45"),
new Event(5, "ae", "2018-05-14 10:29:55"));


DataStream withTimestampsAndWatermarks =
input.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor(Time.seconds(10)) {
@Override
public long extractTimestamp(Event element) {
try {
Date dt = formatter.parse(element.umsTs);
return dt.getTime();
} catch (ParseException e) {
e.printStackTrace();
return 0;
}
}
});

KeyedStream partitionedInput = 
withTimestampsAndWatermarks.keyBy(new KeySelector() {
public Long getKey(Event e) {
return e.id;
}
});

Pattern pattern = Pattern.begin("start")
.subtype(Event.class)
.where(new SimpleCondition() {
@Override
public boolean filter(Event event) {
return event.name.startsWith("a");
}
}).within(Time.seconds(30));

PatternStream patternStream = CEP.pattern(partitionedInput, 
pattern);

DataStream> alerts = patternStream.select(
new PatternSelectFunction>() {
@Override
public List select(Map> pattern) 
{
List startEvent = pattern.get("start");
System.out.println("name:"+startEvent.get(0).name);
return startEvent;
}
}
);
alerts.print();

try {
env.execute("start");
} catch (Exception e) {
e.printStackTrace();
}


}


}




Thanks!
sensun


Re: Event time didn't advance because of some idle slots

2018-07-31 Thread vino yang
Hi Soheil,

The documentation of markAsTemporarilyIdle method is here :
https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.SourceContext.html#markAsTemporarilyIdle--

Thanks, vino.

2018-07-31 17:14 GMT+08:00 Hequn Cheng :

> Hi Soheil,
>
> You can set parallelism to 1 to solve the problem.
> Or use markAsTemporarilyIdle() as Fabian said(the link maybe is
> https://github.com/apache/flink/blob/master/flink-
> connectors/flink-connector-kafka-base/src/main/java/org/
> apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
> line639).
>
> On Tue, Jul 31, 2018 at 4:51 PM, Fabian Hueske  wrote:
>
>> Hi,
>>
>> If you are using a custom source, you can call
>> SourceContext.markAsTemporarilyIdle() to indicate that a task is
>> currently not producing new records [1].
>>
>> Best, Fabian
>>
>> 2018-07-31 8:50 GMT+02:00 Reza Sameei :
>>
>>> It's not a real solution; but why you don't change the parallelism for
>>> your `SourceFunction`?
>>>
>>> On Tue, Jul 31, 2018 at 10:51 AM Soheil Pourbafrani <
>>> soheil.i...@gmail.com> wrote:
>>>
 In Flink Event time mode, I use the periodic watermark to advance event
 time. Every slot extract event time from the incoming message and to emit
 watermark, subtract it a network delay, say 3000ms.

 public Watermark getCurrentWatermark() {
 return new Watermark(MAX_TIMESTAMP - DELEY);
 }

 I have 4 active slots. The problem is just two slots get incoming data
 but all of them call the method getCurrentWatermark(). So in this
 situation consider a case that thread 1 and 2 get incoming data and thread
 3 and 4 will not.

 Thread-1-watermark ---> 1541217659806
 Thread-2-watermark ---> 1541217659810
 Thread-3-watermark ---> (0 - 3000 = -3000)
 Thread-4-watermark ---> (0 - 3000 = -3000)

 So as Flink set the lowest watermark as the general watermark, time
 doesn't go on! If I change the getCurrentWatermark() method as:

 public Watermark getCurrentWatermark() {
 return new Watermark(System.currentTimeMillis() - DELEY);
 }

 it will solve the problem, but I don't want to use machine's timestamp!
 How can I fix the problem?


>>>
>>> --
>>> رضا سامعی  | Reza Sameei | Software Developer | 09126662695
>>>
>>
>>
>


Re: Event time didn't advance because of some idle slots

2018-07-31 Thread vino yang
Hi Soheil,

Hequn has given you the usage of this method, see here :
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L639

Thanks, vino.

2018-07-31 17:56 GMT+08:00 Soheil Pourbafrani :

> Hi vino,
>
> Could you please show markAsTemporary usage by a simple example?
> Thanks
>
> On Tue, Jul 31, 2018 at 2:10 PM, vino yang  wrote:
>
>> Hi Soheil,
>>
>> The documentation of markAsTemporarilyIdle method is here :
>> https://ci.apache.org/projects/flink/flink-docs-release-1.
>> 5/api/java/org/apache/flink/streaming/api/functions/
>> source/SourceFunction.SourceContext.html#markAsTemporarilyIdle--
>>
>> Thanks, vino.
>>
>> 2018-07-31 17:14 GMT+08:00 Hequn Cheng :
>>
>>> Hi Soheil,
>>>
>>> You can set parallelism to 1 to solve the problem.
>>> Or use markAsTemporarilyIdle() as Fabian said(the link maybe is
>>> https://github.com/apache/flink/blob/master/flink-connectors
>>> /flink-connector-kafka-base/src/main/java/org/apache/flink/
>>> streaming/connectors/kafka/FlinkKafkaConsumerBase.java line639).
>>>
>>> On Tue, Jul 31, 2018 at 4:51 PM, Fabian Hueske 
>>> wrote:
>>>
 Hi,

 If you are using a custom source, you can call
 SourceContext.markAsTemporarilyIdle() to indicate that a task is
 currently not producing new records [1].

 Best, Fabian

 2018-07-31 8:50 GMT+02:00 Reza Sameei :

> It's not a real solution; but why you don't change the parallelism for
> your `SourceFunction`?
>
> On Tue, Jul 31, 2018 at 10:51 AM Soheil Pourbafrani <
> soheil.i...@gmail.com> wrote:
>
>> In Flink Event time mode, I use the periodic watermark to advance
>> event time. Every slot extract event time from the incoming message and 
>> to
>> emit watermark, subtract it a network delay, say 3000ms.
>>
>> public Watermark getCurrentWatermark() {
>> return new Watermark(MAX_TIMESTAMP - DELEY);
>> }
>>
>> I have 4 active slots. The problem is just two slots get incoming
>> data but all of them call the method getCurrentWatermark(). So in
>> this situation consider a case that thread 1 and 2 get incoming data and
>> thread 3 and 4 will not.
>>
>> Thread-1-watermark ---> 1541217659806
>> Thread-2-watermark ---> 1541217659810
>> Thread-3-watermark ---> (0 - 3000 = -3000)
>> Thread-4-watermark ---> (0 - 3000 = -3000)
>>
>> So as Flink set the lowest watermark as the general watermark, time
>> doesn't go on! If I change the getCurrentWatermark() method as:
>>
>> public Watermark getCurrentWatermark() {
>> return new Watermark(System.currentTimeMillis() - DELEY);
>> }
>>
>> it will solve the problem, but I don't want to use machine's
>> timestamp! How can I fix the problem?
>>
>>
>
> --
> رضا سامعی  | Reza Sameei | Software Developer | 09126662695
>


>>>
>>
>


Re: Committing Kafka Transactions during Savepoint

2018-07-31 Thread Aljoscha Krettek
Hi Scott,

Some more clarifications:

Doing a stop-with-savepoint will suspend the checkpoint coordinator, meaning 
that no new checkpoints will happen between taking the savepoint and shutting 
down the job. This means you will be save from duplicates if you only use 
savepoints for this.

Regarding committing of the transactions: they might be committed but they 
probably won't be because there is no mechanism that ensures side effects of 
completed checkpoints are effected before shutting down the job after taking 
the savepoint. The transactional sinks work like this: 1) do checkpoint, where 
we prepare the transaction, notify checkpoint coordinator that our checkpoint 
is "complete" 2) wait for message from checkpoint coordinator that all 
checkpoints (from all parallel operators) are complete 3) commit the 
transaction. That last step is currently not guaranteed to happen when stopping 
with a savepoint. However, when restarting a job from a savepoint the source 
will check if there are any open transactions that should have been committed 
(it knows that because they are stored in state) and then commits them.

This works but is a but fragile so it's high on my list of things I want to see 
fixed in Flink 1.7.

Best,
Aljoscha

> On 30. Jul 2018, at 17:34, vino yang  wrote:
> 
> Hi Scott,
> 
> For EXACTLY_ONCE in sink end with Kafka 0.11+ producer, The answer is YES.
> There is a official documentation you can have a good knowledge of this 
> topic[1].
> 
> [1]: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011
>  
> 
> 
> Thanks, vino.
> 
> 
> 
> 2018-07-27 22:53 GMT+08:00 Scott Kidder  >:
> Thank you, Aljoscha! Are Kafka transactions committed when a running job has 
> been instructed to cancel with a savepoint (e.g. `flink cancel -s `)? 
> This is my primary use for savepoints. I would expect that when a new job is 
> submitted with the savepoint, as in the case of an application upgrade, Flink 
> withl create a new Kafka transaction and processing will be exactly-once.
> 
> --Scott Kidder
> 
> On Fri, Jul 27, 2018 at 5:09 AM Aljoscha Krettek  > wrote:
> Hi,
> 
> this has been in the back of my head for a while now. I finally created a 
> Jira issue: https://issues.apache.org/jira/browse/FLINK-9983 
> 
> 
> In there, I also outline a better fix that will take a bit longer to 
> implement.
> 
> Best,
> Aljoscha
> 
>> On 26. Jul 2018, at 23:04, Scott Kidder > > wrote:
>> 
>> I recently began using the exactly-once processing semantic with the Kafka 
>> 0.11 producer in Flink 1.4.2. It's been working great!
>> 
>> Are Kafka transactions committed when creating a Flink savepoint? How does 
>> this affect the recovery behavior in Flink if, before the completion of the 
>> next checkpoint, the application is restarted and restores from a checkpoint 
>> taken before the savepoint? It seems like this might lead to the Kafka 
>> producer writing a message multiple times with different committed Kafka 
>> transactions.
>> 
>> --
>> Scott Kidder
> 
> 



Re: Questions on Unbounded number of keys

2018-07-31 Thread ashish pok
Hi Till,
Keys are unbounded (a group of events have same key but that key doesnt repeat 
after it is fired other than some odd delayed events). So basically there 1 key 
that will be aligned to a window. When you say key space of active windows, 
does that include keys for windows that have already fired and could be in 
memory footprint? If so, that is basically the problem I would get into and 
looking for a solution to clean-up. Like I said earlier overriding tigger to 
FIRE_AND_PURGE did not help. If I take the same stream and key and refactor it 
to how Chang is doing it with Process Function, issue goes away.
If you mean only currently processing key space of active windows (not the ones 
that have already fired)  then I would say, that cannot be the case. We are 
getting the data from period poll of same number of devices and uniqueness of 
key is simply a time identifier prefixed to device identifier. Even though 
there could be a little delayed data, the chances of number of unique keys 
growing constantly for days is probably none as device list is constant.
Thanks, Ashish


- Ashish

On Tuesday, July 31, 2018, 4:05 AM, Till Rohrmann  wrote:

Hi Ashish,
the processing time session windows need to store state in the StateBackends 
and I assume that your key space of active windows is constantly growing. That 
could explain why you are seeing an ever increasing memory footprint. But 
without knowing the input stream and what the UDFs do this is only a guess.
Cheers,Till
On Mon, Jul 30, 2018 at 1:43 PM Fabian Hueske  wrote:

Hi Chang,
The state handle objects are not created per key but just once per function 
instance.
Instead they route state accesses to the backend (JVM heap or RocksDB) for the 
currently active key.
Best, Fabian

2018-07-30 12:19 GMT+02:00 Chang Liu :

Hi Andrey,
Thanks for your reply. My question might be silly, but there is still one part 
I would like to fully understand. For example, in the following example:
class MyFunction extends KeyedProcessFunction[String, Click, Click] { // keyed 
by Session ID
  lazy val userId: ValueState[String] = getRuntimeContext.getState(
new ValueStateDescriptor[String]("userId", BasicTypeInfo.STRING_TYPE_INFO))

  lazy val clicks: ListState[Click] = getRuntimeContext.getListState(
new ListStateDescriptor[Click]("clicks", createTypeInformation[Click]))

  override def processElement(
  click: Click,
  context: KeyedProcessFunction[String, Click, Click]#Context,
  out: Collector[Click])
  : Unit = {
// process, output, clear state if necessary
  }

  override def onTimer(
  timestamp: Long,
  ctx: KeyedProcessFunction[String, Click, Click]#OnTimerContext,
  out: Collector[Click])
  : Unit = {
// output and clear state
  }
}
Even though I am regularly clearing the two states, userId and clicks (which 
means I am cleaning up the values stored in the States), my question is: then 
what about the two State objects themselves: userId and clicks?  These States 
objects are also created per Session ID right? If the number of Session IDs are 
unbounded, than the number of these State objects are also unbounded.
That means, there are userId-state-1 and clicks-state-1 for session-id-1, 
userId-state-2 and clicks-state-2 for session-id-2, userId-state-3 and 
clicks-state-3 for session-id-3, …, which are handled by different (or same if 
two from different range, as you call it, are assigned to the same one) keyed 
operator instance.
I am not concerning the actual value in the State (which will be managed 
carefully, if I am clearing them carefully). I am thinking about the State 
objects themselves, which I have no idea what is happening to them and what 
will happen to them.
Many thanks :)
Best regards/祝好,

Chang Liu 刘畅



On 26 Jul 2018, at 10:55, Andrey Zagrebin  wrote:
Hi Chang Liu,
The unbounded nature of the stream keyed or not should not lead to out of 
memory. 
Flink parallel keyed operator instances have fixed number (parallelism) and 
just process some range of keyed elements, in your example it is a subrange of 
session ids. 
The keyed processed elements (http requests) are objects created when they 
enter the pipeline and garage collected after having been processed in 
streaming fashion. 
If they arrive very rapidly it can lead to high back pressure from upstream to 
downstream operators, buffers can become full and pipeline stops/slows down 
processing external inputs, it usually means that your pipeline is under 
provisioned. 
The only accumulated data comes from state (windows, user state etc), so if you 
control its memory consumption, as Till described, there should be no other 
source of out of memory.
Cheers,Andrey

On 25 Jul 2018, at 19:06, Chang Liu  wrote:
Hi Till,
Thanks for your reply. But I think maybe I did not make my question clear. My 
question is not about whether the States within each keyed operator instances 
will run out of memory. My question is about, whether the unlimited keyed 

Re: Questions on Unbounded number of keys

2018-07-31 Thread Till Rohrmann
Hi Ashish,

FIRE_AND_PURGE should also clear the window state. Yes I mean with active
windows, windows which have not been purged yet.

Maybe Aljoscha knows more about why the window state is growing (I would
not rule out a bug).

Cheers,
Till

On Tue, Jul 31, 2018 at 1:45 PM ashish pok  wrote:

> Hi Till,
>
> Keys are unbounded (a group of events have same key but that key doesnt
> repeat after it is fired other than some odd delayed events). So basically
> there 1 key that will be aligned to a window. When you say key space of
> active windows, does that include keys for windows that have already fired
> and could be in memory footprint? If so, that is basically the problem I
> would get into and looking for a solution to clean-up. Like I said earlier
> overriding tigger to FIRE_AND_PURGE did not help. If I take the same stream
> and key and refactor it to how Chang is doing it with Process Function,
> issue goes away.
>
> If you mean only currently processing key space of active windows (not the
> ones that have already fired)  then I would say, that cannot be the case.
> We are getting the data from period poll of same number of devices and
> uniqueness of key is simply a time identifier prefixed to device
> identifier. Even though there could be a little delayed data, the chances
> of number of unique keys growing constantly for days is probably none as
> device list is constant.
>
> Thanks, Ashish
>
>
> - Ashish
>
> On Tuesday, July 31, 2018, 4:05 AM, Till Rohrmann 
> wrote:
>
> Hi Ashish,
>
> the processing time session windows need to store state in the
> StateBackends and I assume that your key space of active windows is
> constantly growing. That could explain why you are seeing an ever
> increasing memory footprint. But without knowing the input stream and what
> the UDFs do this is only a guess.
>
> Cheers,
> Till
>
> On Mon, Jul 30, 2018 at 1:43 PM Fabian Hueske  wrote:
>
> Hi Chang,
>
> The state handle objects are not created per key but just once per
> function instance.
> Instead they route state accesses to the backend (JVM heap or RocksDB) for
> the currently active key.
>
> Best, Fabian
>
> 2018-07-30 12:19 GMT+02:00 Chang Liu :
>
> Hi Andrey,
>
> Thanks for your reply. My question might be silly, but there is still one
> part I would like to fully understand. For example, in the following
> example:
>
> class MyFunction extends KeyedProcessFunction[String, Click, Click] { // 
> keyed by Session ID
>   lazy val userId: ValueState[String] = getRuntimeContext.getState(
> new ValueStateDescriptor[String]("userId", 
> BasicTypeInfo.STRING_TYPE_INFO))
>
>   lazy val clicks: ListState[Click] = getRuntimeContext.getListState(
> new ListStateDescriptor[Click]("clicks", createTypeInformation[Click]))
>
>   override def processElement(
>   click: Click,
>   context: KeyedProcessFunction[String, Click, Click]#Context,
>   out: Collector[Click])
>   : Unit = {
> // process, output, clear state if necessary
>   }
>
>   override def onTimer(
>   timestamp: Long,
>   ctx: KeyedProcessFunction[String, Click, Click]#OnTimerContext,
>   out: Collector[Click])
>   : Unit = {
> // output and clear state
>   }
> }
>
>
> Even though I am regularly clearing the two states, userId and clicks
> (which means I am cleaning up the values stored in the States), my question
> is: then what about the two State objects themselves: userId and clicks?
> These States objects are also created per Session ID right? If the number
> of Session IDs are unbounded, than the number of these State objects are
> also unbounded.
>
> That means, there are *userId-state-1 and clicks-state-1 for session-id-1*,
> *userId-state-2 and clicks-state-2 for session-id-2*, *userId-state-3 and
> clicks-state-3 for session-id-3*, …, which are handled by different (or
> same if two from different *range*, as you call it, are assigned to the
> same one) keyed operator instance.
>
> I am not concerning the actual value in the State (which will be managed
> carefully, if I am clearing them carefully). I am thinking about the State
> objects themselves, which I have no idea what is happening to them and what
> will happen to them.
>
> Many thanks :)
>
> Best regards/祝好,
>
> Chang Liu 刘畅
>
>
> On 26 Jul 2018, at 10:55, Andrey Zagrebin 
> wrote:
>
> Hi Chang Liu,
>
> The unbounded nature of the stream keyed or not should not lead to out of
> memory.
>
> Flink parallel keyed operator instances have fixed number (parallelism)
> and just process some range of keyed elements, in your example it is a
> subrange of session ids.
>
> The keyed processed elements (http requests) are objects created when they
> enter the pipeline and garage collected after having been processed in
> streaming fashion.
>
> If they arrive very rapidly it can lead to high back pressure from
> upstream to downstream operators, buffers can become full and pipeline
> stops/slows down processing external inputs, it usually means that your
> pipe

Re: Small-files source - partitioning based on prefix of file

2018-07-31 Thread Averell
Hi Fabian,

Thanks for the information. I will try to look at the change to that complex
logic that you mentioned when I have time. That would save one more shuffle
(from 1 to 0), wouldn't that?

BTW, regarding fault tolerant in the file reader task, could you help
explain what would happen if the reader task crash in the middle of reading
one split? E.g: the split has 100 lines, and the reader crashed after
reading 30 lines. What would happen when the operator gets resumed? Would
those first 30 lines get reprocessed the 2nd time?

Those tens of thousands of files that I have are currently not in CSV
format. Each file has some heading session of 10-20 lines (common data for
the node), then data session with one CSV line for each record, then again
some common data, and finally, a 2nd data session - one CSV line for each
record.
My current solution is to write a non-Flink job to preprocess those files
and bring them to standard CSV format to be the input for Flink.

I am thinking of doing this in Flink, with a custom file reader function
which works in a similar way to wholeTextFile function in Spark batch
processing. However, I don't know how to have fault tolerance in doing that
yet.

Thank you very much for your support.

Regards,
Averell 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Pass JVM option (-Dconfig.file) per job in standalone mode?

2018-07-31 Thread Chang Liu
Dear all,

I would like to know is there a way to pass JVM options (for example, 
-Dconfig.file=application.conf) for each submitted flink job? I am using the 
Config library from lightbend.

./bin/flink run examples/Example.jar -Dconfig.file=/path/application.conf

Best regards/祝好,

Chang Liu 刘畅




Re: Questions on Unbounded number of keys

2018-07-31 Thread ashish pok
Thanks Till, I will try to create an instance of app will smaller heap and get 
a couple of dumps as well. I should be ok to share that on google drive. 


- Ashish

On Tuesday, July 31, 2018, 7:49 AM, Till Rohrmann  wrote:

Hi Ashish,
FIRE_AND_PURGE should also clear the window state. Yes I mean with active 
windows, windows which have not been purged yet.
Maybe Aljoscha knows more about why the window state is growing (I would not 
rule out a bug).
Cheers,Till
On Tue, Jul 31, 2018 at 1:45 PM ashish pok  wrote:

Hi Till,
Keys are unbounded (a group of events have same key but that key doesnt repeat 
after it is fired other than some odd delayed events). So basically there 1 key 
that will be aligned to a window. When you say key space of active windows, 
does that include keys for windows that have already fired and could be in 
memory footprint? If so, that is basically the problem I would get into and 
looking for a solution to clean-up. Like I said earlier overriding tigger to 
FIRE_AND_PURGE did not help. If I take the same stream and key and refactor it 
to how Chang is doing it with Process Function, issue goes away.
If you mean only currently processing key space of active windows (not the ones 
that have already fired)  then I would say, that cannot be the case. We are 
getting the data from period poll of same number of devices and uniqueness of 
key is simply a time identifier prefixed to device identifier. Even though 
there could be a little delayed data, the chances of number of unique keys 
growing constantly for days is probably none as device list is constant.
Thanks, Ashish


- Ashish

On Tuesday, July 31, 2018, 4:05 AM, Till Rohrmann  wrote:

Hi Ashish,
the processing time session windows need to store state in the StateBackends 
and I assume that your key space of active windows is constantly growing. That 
could explain why you are seeing an ever increasing memory footprint. But 
without knowing the input stream and what the UDFs do this is only a guess.
Cheers,Till
On Mon, Jul 30, 2018 at 1:43 PM Fabian Hueske  wrote:

Hi Chang,
The state handle objects are not created per key but just once per function 
instance.
Instead they route state accesses to the backend (JVM heap or RocksDB) for the 
currently active key.
Best, Fabian

2018-07-30 12:19 GMT+02:00 Chang Liu :

Hi Andrey,
Thanks for your reply. My question might be silly, but there is still one part 
I would like to fully understand. For example, in the following example:
class MyFunction extends KeyedProcessFunction[String, Click, Click] { // keyed 
by Session ID
  lazy val userId: ValueState[String] = getRuntimeContext.getState(
new ValueStateDescriptor[String]("userId", BasicTypeInfo.STRING_TYPE_INFO))

  lazy val clicks: ListState[Click] = getRuntimeContext.getListState(
new ListStateDescriptor[Click]("clicks", createTypeInformation[Click]))

  override def processElement(
  click: Click,
  context: KeyedProcessFunction[String, Click, Click]#Context,
  out: Collector[Click])
  : Unit = {
// process, output, clear state if necessary
  }

  override def onTimer(
  timestamp: Long,
  ctx: KeyedProcessFunction[String, Click, Click]#OnTimerContext,
  out: Collector[Click])
  : Unit = {
// output and clear state
  }
}
Even though I am regularly clearing the two states, userId and clicks (which 
means I am cleaning up the values stored in the States), my question is: then 
what about the two State objects themselves: userId and clicks?  These States 
objects are also created per Session ID right? If the number of Session IDs are 
unbounded, than the number of these State objects are also unbounded.
That means, there are userId-state-1 and clicks-state-1 for session-id-1, 
userId-state-2 and clicks-state-2 for session-id-2, userId-state-3 and 
clicks-state-3 for session-id-3, …, which are handled by different (or same if 
two from different range, as you call it, are assigned to the same one) keyed 
operator instance.
I am not concerning the actual value in the State (which will be managed 
carefully, if I am clearing them carefully). I am thinking about the State 
objects themselves, which I have no idea what is happening to them and what 
will happen to them.
Many thanks :)
Best regards/祝好,

Chang Liu 刘畅



On 26 Jul 2018, at 10:55, Andrey Zagrebin  wrote:
Hi Chang Liu,
The unbounded nature of the stream keyed or not should not lead to out of 
memory. 
Flink parallel keyed operator instances have fixed number (parallelism) and 
just process some range of keyed elements, in your example it is a subrange of 
session ids. 
The keyed processed elements (http requests) are objects created when they 
enter the pipeline and garage collected after having been processed in 
streaming fashion. 
If they arrive very rapidly it can lead to high back pressure from upstream to 
downstream operators, buffers can become full and pipeline stops/slows down 
processing external inputs, it usually means that you

Re: Pass JVM option (-Dconfig.file) per job in standalone mode?

2018-07-31 Thread Chang Liu
Hi Dominik,

Thanks for your reply. I thought so as well. But I am having some problems with 
passing program args.

Currently, i have the following:

object Configs {

  lazy val CONFIG: Config = ConfigFactory.load()

  lazy val config1: String = CONFIG.getString("key1")
  lazy val config2: String = CONFIG.getString("key2")
  lazy val config3: String = CONFIG.getString("key3")
  ...


}

If I pass the application.conf as program args, it will be like this:


object Configs {

  def setup(file: String): Unit = CONFIG = ConfigFactory.parseFile(new 
File(file))

  var CONFIG: Config = _

  lazy val config1: String = CONFIG.getString("key1")
  lazy val config2: String = CONFIG.getString("key2")
  lazy val config3: String = CONFIG.getString("key3")
  ...


}


This Scala code is not looking good I think. 
The CONFG is var instead of val
I have to call setup every time
I have some other object or vals which initialised depending on the parameters. 
If the parameters are passed in as program args instead of from JVM option, 
these object vals cannot be initialised properly.


And ideas about it?  Many Thanks :)


Best regards/祝好,

Chang Liu 刘畅


> On 31 Jul 2018, at 14:15, Dominik Wosiński  wrote:
> 
> Hey, 
> I don't think that it's possible per job. You can pass only program arguments 
> but not JVM options. 
> 
> Regards,
> Dominik.
> 
> 2018-07-31 13:59 GMT+02:00 Chang Liu  >:
> Dear all,
> 
> I would like to know is there a way to pass JVM options (for example, 
> -Dconfig.file=application.conf) for each submitted flink job? I am using the 
> Config library from lightbend.
> 
> ./bin/flink run examples/Example.jar -Dconfig.file=/path/application.conf
> 
> Best regards/祝好,
> 
> Chang Liu 刘畅
> 
> 
> 



Access to Kafka Event Time

2018-07-31 Thread Vishal Santoshi
We have a use case where multiple topics are streamed to hdfsand we would
want to created buckets based on ingestion time ( the time the event were
pushed to kafka ). Our producers to kafka will set that the event time

https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010

suggests that the the "previousElementTimeStamp" will provide that
timestamp provided "EventTime" characteristic is set. It also provides for
the element. In out case the element will expose setIngestionTIme(long
time) method. Is the element in this method

public long extractTimestamp(Long element, long previousElementTimestamp)

 passed by reference and can it be safely ( loss lessly ) mutated for
downstream operators ?


That said there is another place where that record time stamp is available.

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L141

Is it possible to change the signature of the

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java#L46

to add record timestamp as the last argument ?

Regards,

Vishal


Apache-flink -- Checkpointing to S3 Bucket

2018-07-31 Thread Chargel, Rafael
We have Apache Flink (1.4.2) running on an EMR cluster. We are checkpointing to 
an S3 bucket, and are pushing about 5,000 records per second through the flows. 
We recently saw the following error in our logs:
java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask 
timed out on 
[Actor[akka.tcp://flink@ip-XXX-XXX-XXX-XXX:XX/user/taskmanager#-XXX]] 
after [1 ms]. Sender[null] sent message of type 
"org.apache.flink.runtime.messages.TaskManagerMessages$RequestTaskManagerLog".
  at 
java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
  at 
java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
  at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
  at 
java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
  at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
  at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
  at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:442)
  at akka.dispatch.OnComplete.internal(Future.scala:258)
  at akka.dispatch.OnComplete.internal(Future.scala:256)
  at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
  at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
  at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
  at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
Immediately after this we got the following in our logs:
2018-07-30 15:08:32,177 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 831 @ 1532963312177
2018-07-30 15:09:46,750 ERROR 
org.apache.flink.runtime.blob.BlobServerConnection- PUT operation 
failed
java.io.EOFException: Read an incomplete length
  at org.apache.flink.runtime.blob.BlobUtils.readLength(BlobUtils.java:366)
  at 
org.apache.flink.runtime.blob.BlobServerConnection.readFileFully(BlobServerConnection.java:403)
  at 
org.apache.flink.runtime.blob.BlobServerConnection.put(BlobServerConnection.java:349)
  at 
org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:114)
At this point, the flow crashed and was not able to automatically recover, 
however we were able to restart the flow manually, without needing to change 
the location of the s3 bucket. The fact that the crash occurred while pushing 
to S3, makes me think that is the crux of the problem.
Any ideas?
Thanks,
Rafael

PS: I posted this to StackOverflow as well, and have had no responses: 
https://stackoverflow.com/questions/51597785/apache-flink-error-checkpointing-to-s3




Re: Access to Kafka Event Time

2018-07-31 Thread Vishal Santoshi
In fact it may be available else where too ( for example ProcessFunction
etc ) but do we have no need to create one, it is just a data relay ( kafka
to hdfs ) and any intermediate processing should be avoided if possible
IMHO.

On Tue, Jul 31, 2018 at 9:10 AM, Vishal Santoshi 
wrote:

> We have a use case where multiple topics are streamed to hdfsand we would
> want to created buckets based on ingestion time ( the time the event were
> pushed to kafka ). Our producers to kafka will set that the event time
>
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.5/dev/connectors/kafka.html#using-kafka-
> timestamps-and-flink-event-time-in-kafka-010
>
> suggests that the the "previousElementTimeStamp" will provide that
> timestamp provided "EventTime" characteristic is set. It also provides for
> the element. In out case the element will expose setIngestionTIme(long
> time) method. Is the element in this method
>
> public long extractTimestamp(Long element, long previousElementTimestamp)
>
>  passed by reference and can it be safely ( loss lessly ) mutated for
> downstream operators ?
>
>
> That said there is another place where that record time stamp is available.
>
> https://github.com/apache/flink/blob/master/flink-
> connectors/flink-connector-kafka-0.9/src/main/java/org/
> apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L141
>
> Is it possible to change the signature of the
>
> https://github.com/apache/flink/blob/master/flink-
> connectors/flink-connector-kafka-base/src/main/java/org/
> apache/flink/streaming/util/serialization/KeyedDeserializationSchema.
> java#L46
>
> to add record timestamp as the last argument ?
>
> Regards,
>
> Vishal
>
>
>
>
>
>
>


Re: Counting elements that appear "behind" the watermark

2018-07-31 Thread Julio Biason
Hey Elias,

Thanks for the tips. Unfortunately, it seems `Context` only have
information from the element being processed (
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java#L91)
and the RuntimeContext doesn't have access to any watermark information (
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java#L57
).

On Mon, Jul 30, 2018 at 10:28 PM, Elias Levy 
wrote:

> You can create a ProcessFunction.  That gives you access to
> getRuntimeContext to register metrics, to the element timestamp, and the
> current watermark.  Keep in mind that operators first process a record and
> then process any watermark that was the result of that record, so that when
> you get the current watermark from within the processElement method, the
> watermark generated from that element won't be the current watermark.
>
> On Mon, Jul 30, 2018 at 10:33 AM Julio Biason 
> wrote:
>
>> Hello,
>>
>> Our current watermark model is "some time behind the most recent seen
>> element" (very close to what the docs have in "Periodic Watermark"
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_
>> timestamps_watermarks.html#with-periodic-watermarks). It fits our
>> current processing model.
>>
>> The thing is, we want to extract information about elements appearing
>> behind the watermark, to give some insight when we need to update the
>> amount of time behind the most seen element we need. The problem is, I
>> can't create any metrics inside the AssignerWithPeriodicWatermarks 'cause
>> it has no `getRuntime()` to attach the metric.
>>
>> Is there any way we can count those (a ProcessFunction before the .
>> assignTimestampsAndWatermarks(), maybe)?
>>
>> --
>> *Julio Biason*, Sofware Engineer
>> *AZION*  |  Deliver. Accelerate. Protect.
>> Office: +55 51 3083 8101   |  Mobile: +55 51
>> *99907 0554*
>>
>


-- 
*Julio Biason*, Sofware Engineer
*AZION*  |  Deliver. Accelerate. Protect.
Office: +55 51 3083 8101   |  Mobile: +55 51
*99907 0554*


python vs java api

2018-07-31 Thread Nicos Maris
Which java features of flink are not supported by the python API when
writing a pipeline?


My task managers are not starting

2018-07-31 Thread Felipe Gutierrez
Hi all,

I am deploying a Flink cluster using the version
"flink-1.5.2-bin-hadoop28-scala_2.11.tgz". It is one master node and two
slave nodes. I have configured the key-ssh between all nodes so I can log
in without type the password (also the nodes of the cluster). When I start
the cluster it show that it is starting the standalonesession for the
master node. Should not be the JobManager? Or did it changed for version
1.5.+?

Additionally, my slave nodes do not start. I can see on the dashboard that
my cluster is running, but I have no taskmanagers to process my tasks.

# /etc/flink-1.5.2/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host sense1.
Starting taskexecutor daemon on host sense3.
Starting taskexecutor daemon on host sense4.

I have considered to decrease the memory heap size for the job manager and
tasks manager for 512 MB, but with no success. Does anyone could help me to
start the cluster?

thanks,
Felipe


*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


Re: python vs java api

2018-07-31 Thread vino yang
Hi Nicos,

You can read the official documentation of latest Python API about
DataStream transformation[1] and latest Java API transformation[2].

However, the latest documentation may not react the new feature especially
for Python API, so you can also compare the implementation of
DataStream(java)[3] and PythonDataStream [4]. Note, for [3], it's not
completed because some API exists in other stream objects such keyed data
stream.

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/python.html#transformations
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/index.html#datastream-transformations
[3]:
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
[4]:
https://github.com/apache/flink/blob/master/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonDataStream.java

Thanks, vino.


2018-07-31 22:59 GMT+08:00 Nicos Maris :

> Which java features of flink are not supported by the python API when
> writing a pipeline?
>


Re: My task managers are not starting

2018-07-31 Thread Felipe Gutierrez
Strange. I decreased the max value (TM_MAX_OFFHEAP_SIZE) even it says the
slave will not use all space and it worked.

# Long.MAX_VALUE in TB: This is an upper bound, much less direct
memory will be used
# TM_MAX_OFFHEAP_SIZE="8388607T"



*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*

On Tue, Jul 31, 2018 at 5:38 PM, Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi all,
>
> I am deploying a Flink cluster using the version 
> "flink-1.5.2-bin-hadoop28-scala_2.11.tgz".
> It is one master node and two slave nodes. I have configured the key-ssh
> between all nodes so I can log in without type the password (also the nodes
> of the cluster). When I start the cluster it show that it is starting the
> standalonesession for the master node. Should not be the JobManager? Or did
> it changed for version 1.5.+?
>
> Additionally, my slave nodes do not start. I can see on the dashboard that
> my cluster is running, but I have no taskmanagers to process my tasks.
>
> # /etc/flink-1.5.2/bin/start-cluster.sh
> Starting cluster.
> Starting standalonesession daemon on host sense1.
> Starting taskexecutor daemon on host sense3.
> Starting taskexecutor daemon on host sense4.
>
> I have considered to decrease the memory heap size for the job manager and
> tasks manager for 512 MB, but with no success. Does anyone could help me to
> start the cluster?
>
> thanks,
> Felipe
>
>
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>


Re: python vs java api

2018-07-31 Thread Nicos Maris
Thanks Vino,


Comparing functionalities in terms of the transformations is clear but what
about timestamps and state?

On Tue, Jul 31, 2018 at 6:47 PM vino yang  wrote:

> Hi Nicos,
>
> You can read the official documentation of latest Python API about
> DataStream transformation[1] and latest Java API transformation[2].
>
> However, the latest documentation may not react the new feature especially
> for Python API, so you can also compare the implementation of
> DataStream(java)[3] and PythonDataStream [4]. Note, for [3], it's not
> completed because some API exists in other stream objects such keyed data
> stream.
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/python.html#transformations
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/index.html#datastream-transformations
> [3]:
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
> [4]:
> https://github.com/apache/flink/blob/master/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonDataStream.java
>
> Thanks, vino.
>
>
> 2018-07-31 22:59 GMT+08:00 Nicos Maris :
>
>> Which java features of flink are not supported by the python API when
>> writing a pipeline?
>>
>
>


Re: Description of Flink event time processing

2018-07-31 Thread Fabian Hueske
Hi Elias,

Sorry for the delay. I just made a pass over the document.
I think it is very good.

Let's have a look where this fits best into the docs and check if there is
duplicate content on other pages that should be removed / reorganized.

Best, Fabian

2018-07-31 3:17 GMT+02:00 Elias Levy :

> Fabian,
>
> You have any time to review the changes?
>
> On Thu, Jul 19, 2018 at 2:19 AM Fabian Hueske  wrote:
>
>> Hi Elias,
>>
>> Thanks for the update!
>> I'll try to have another look soon.
>>
>> Best, Fabian
>>
>> 2018-07-11 1:30 GMT+02:00 Elias Levy :
>>
>>> Thanks for all the comments.  I've updated the document to account for
>>> the feedback.  Please take a look.
>>>
>>> On Fri, Jul 6, 2018 at 2:33 PM Elias Levy 
>>> wrote:
>>>
 Apologies.  Comments are now enabled.

 On Thu, Jul 5, 2018 at 6:09 PM Rong Rong  wrote:

> Hi Elias,
>
> Thanks for putting together the document. This is actually a very
> good, well-rounded document.
> I think you did not to enable access for comments for the link. Would
> you mind enabling comments for the google doc?
>
> Thanks,
> Rong
>
>
> On Thu, Jul 5, 2018 at 8:39 AM Fabian Hueske 
> wrote:
>
>> Hi Elias,
>>
>> Thanks for the great document!
>> I made a pass over it and left a few comments.
>>
>> I think we should definitely add this to the documentation.
>>
>> Thanks,
>> Fabian
>>
>> 2018-07-04 10:30 GMT+02:00 Fabian Hueske :
>>
>>> Hi Elias,
>>>
>>> I agree, the docs lack a coherent discussion of event time features.
>>> Thank you for this write up!
>>> I just skimmed your document and will provide more detailed feedback
>>> later.
>>>
>>> It would be great to add such a page to the documentation.
>>>
>>> Best, Fabian
>>>
>>> 2018-07-03 3:07 GMT+02:00 Elias Levy :
>>>
 The documentation of how Flink handles event time and watermarks is
 spread across several places.  I've been wanting a single location that
 summarizes the subject, and as none was available, I wrote one up.

 You can find it here: https://docs.google.com/document/d/1b5d-
 hTdJQsPH3YD0zTB4ZqodinZVHFomKvt41FfUPMc/edit?usp=sharing

 I'd appreciate feedback, particularly about the correctness of the
 described behavior.

>>>
>>>
>>
>>


Re: [ANNOUNCE] Apache Flink 1.5.2 released

2018-07-31 Thread Bowen Li
Congratulations, community!

On Tue, Jul 31, 2018 at 1:44 AM Chesnay Schepler  wrote:

> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.5.2, which is the second bugfix release for the Apache Flink 1.5
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
> https://flink.apache.org/news/2018/07/31/release-1.5.2.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12343588
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Regards,
> Chesnay
>


Rest API calls

2018-07-31 Thread yuvraj singh
Hi I have a use case where I need to call rest apis from a flink . I am not
getting much context form internet , please help me on this .

Thanks


Re: Small-files source - partitioning based on prefix of file

2018-07-31 Thread Fabian Hueske
Hi Averell,

please find my answers inlined.

Best, Fabian

2018-07-31 13:52 GMT+02:00 Averell :

> Hi Fabian,
>
> Thanks for the information. I will try to look at the change to that
> complex
> logic that you mentioned when I have time. That would save one more shuffle
> (from 1 to 0), wouldn't that?
>

I'm not 100% sure about that. You would need to implement it in a way that
you can use the "reinterpret as keyed stream" feature which is currently
experimental [1].
Not sure if that's possible.


>
> BTW, regarding fault tolerant in the file reader task, could you help
> explain what would happen if the reader task crash in the middle of reading
> one split? E.g: the split has 100 lines, and the reader crashed after
> reading 30 lines. What would happen when the operator gets resumed? Would
> those first 30 lines get reprocessed the 2nd time?
>
>
This depends on the implementation of the InputFormat. If it implements the
CheckpointableInputFormat interface, it is able to checkpoint the current
reading position in a split and can be reset to that position during
recovery.
In either case, some record will be read twice but if reading position can
be reset, you can still have exactly-once state consistency because the
state is reset as well.


> Those tens of thousands of files that I have are currently not in CSV
> format. Each file has some heading session of 10-20 lines (common data for
> the node), then data session with one CSV line for each record, then again
> some common data, and finally, a 2nd data session - one CSV line for each
> record.
> My current solution is to write a non-Flink job to preprocess those files
> and bring them to standard CSV format to be the input for Flink.
>

You can implement that with a custom FileInputFormat.


>
> I am thinking of doing this in Flink, with a custom file reader function
> which works in a similar way to wholeTextFile function in Spark batch
> processing. However, I don't know how to have fault tolerance in doing that
> yet.
>
> Thank you very much for your support.
>
> Regards,
> Averell
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream


Converting a DataStream into a Table throws error

2018-07-31 Thread Mich Talebzadeh
Hi,

I am following this example

https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#integration-with-datastream-and-dataset-api

This is my dataStream which is built on a Kafka topic

   //
//Create a Kafka consumer
//
val dataStream =  streamExecEnv
   .addSource(new FlinkKafkaConsumer011[String](topicsValue, new
SimpleStringSchema(), properties))
 //
 //
  val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
  val table1: Table = tableEnv.fromDataStream(dataStream, 'key, 'ticker,
'timeissued, 'price)

While compiling it throws this error

[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:169:
overloaded method value fromDataStream with alternatives:
[error]   [T](dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T], fields:
String)org.apache.flink.table.api.Table 
[error]   [T](dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
[error]  cannot be applied to
(org.apache.flink.streaming.api.datastream.DataStreamSource[String],
Symbol, Symbol, Symbol, Symbol)
[error]   val table1: Table = tableEnv.fromDataStream(dataStream, 'key,
'ticker, 'timeissued, 'price)
[error]^
[error] one error found
[error] (compile:compileIncremental) Compilation failed

The topic is very simple, it is comma separated prices. I tried mapFunction
and flatMap but neither worked!

Thanks,


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Yahoo Streaming Benchmark on a Flink 1.5 cluster

2018-07-31 Thread Naum Gjorgjeski
Hi,

I am trying to run the data Artisans version  of the Yahoo Streaming Benchmark. 
The benchmark applications are  written for Flink 1.0.1. However, I need them 
to run on a Flink 1.5  cluster. When I try to build the benchmark applications 
with any version  of Flink from 1.3.0 or higher, I get many compile errors. The 
compile  errors state that some of the classes and methods cannot be found  
(because part of the Flink API has changed in recent versions).

The classes that cannot be found are:
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner
org.apache.flink.api.common.state.OperatorState
org.apache.flink.api.common.state.StateBackend
org.apache.flink.runtime.state.AsynchronousStateHandle
org.apache.flink.runtime.state.StateHandle
org.apache.flink.streaming.runtime.tasks.StreamTaskState
org.apache.flink.runtime.state.AbstractStateBackend.CheckpointStateOutputView

The methods that cannot be found are:
org.apache.flink.streaming.api.operators.StreamOperator.snapshotOperatorState(long,long)
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStateBackend()
org.apache.flink.runtime.state.AbstractStateBackend.createCheckpointStateOutputView(long,long)

I was able to find a substitution for few of them (e.g. using 
FlinkFixedPartitioner  instead of FlinkPartitioner), but for most of them there 
are no  straightforward substitutions. Could you please give me an advice on 
how  to resolve this problem? Thank you in advance.

Best regards,
Naum



Re: Counting elements that appear "behind" the watermark

2018-07-31 Thread Elias Levy
Correct.  Context gives you access to the element timestamp
.
But it also gives you access to the current watermark via timerService

->
currentWatermark

.

On Tue, Jul 31, 2018 at 7:45 AM Julio Biason  wrote:

> Thanks for the tips. Unfortunately, it seems `Context` only have
> information from the element being processed (
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java#L91)
> and the RuntimeContext doesn't have access to any watermark information (
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java#L57
> ).
>
>


Re: Rest API calls

2018-07-31 Thread vino yang
Hi yuvraj,

The documentation of Flink REST API is here :
https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/rest_api.html#monitoring-rest-api

Thanks, vino.

2018-08-01 3:29 GMT+08:00 yuvraj singh <19yuvrajsing...@gmail.com>:

> Hi I have a use case where I need to call rest apis from a flink . I am
> not getting much context form internet , please help me on this .
>
> Thanks
>
>


Re: Converting a DataStream into a Table throws error

2018-07-31 Thread Hequn Cheng
Hi, Mich

You can try adding "import org.apache.flink.table.api.scala._", so that the
Symbol can be recognized as an Expression.

Best, Hequn

On Wed, Aug 1, 2018 at 6:16 AM, Mich Talebzadeh 
wrote:

> Hi,
>
> I am following this example
>
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.5/dev/table/common.html#integration-with-
> datastream-and-dataset-api
>
> This is my dataStream which is built on a Kafka topic
>
>//
> //Create a Kafka consumer
> //
> val dataStream =  streamExecEnv
>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new
> SimpleStringSchema(), properties))
>  //
>  //
>   val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>   val table1: Table = tableEnv.fromDataStream(dataStream, 'key, 'ticker,
> 'timeissued, 'price)
>
> While compiling it throws this error
>
> [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/
> myPackage/md_streaming.scala:169: overloaded method value fromDataStream
> with alternatives:
> [error]   [T](dataStream: 
> org.apache.flink.streaming.api.datastream.DataStream[T],
> fields: String)org.apache.flink.table.api.Table 
> [error]   [T](dataStream: org.apache.flink.streaming.
> api.datastream.DataStream[T])org.apache.flink.table.api.Table
> [error]  cannot be applied to 
> (org.apache.flink.streaming.api.datastream.DataStreamSource[String],
> Symbol, Symbol, Symbol, Symbol)
> [error]   val table1: Table = tableEnv.fromDataStream(dataStream, 'key,
> 'ticker, 'timeissued, 'price)
> [error]^
> [error] one error found
> [error] (compile:compileIncremental) Compilation failed
>
> The topic is very simple, it is comma separated prices. I tried
> mapFunction and flatMap but neither worked!
>
> Thanks,
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: python vs java api

2018-07-31 Thread vino yang
Hi Nicos,

For Checkpoint you can see the API in PythonStreamExecutionEnvironment.

But it can not set TimeCharacteristic now. I will create a JIRA issue for
this.

Thanks, vino.

2018-08-01 0:15 GMT+08:00 Nicos Maris :

> Thanks Vino,
>
>
> Comparing functionalities in terms of the transformations is clear but
> what about timestamps and state?
>
> On Tue, Jul 31, 2018 at 6:47 PM vino yang  wrote:
>
>> Hi Nicos,
>>
>> You can read the official documentation of latest Python API about
>> DataStream transformation[1] and latest Java API transformation[2].
>>
>> However, the latest documentation may not react the new feature
>> especially for Python API, so you can also compare the implementation of
>> DataStream(java)[3] and PythonDataStream [4]. Note, for [3], it's not
>> completed because some API exists in other stream objects such keyed data
>> stream.
>>
>> [1]: https://ci.apache.org/projects/flink/flink-docs-
>> release-1.5/dev/stream/python.html#transformations
>> [2]: https://ci.apache.org/projects/flink/flink-docs-
>> release-1.5/dev/stream/operators/index.html#datastream-transformations
>> [3]: https://github.com/apache/flink/blob/master/
>> flink-streaming-java/src/main/java/org/apache/flink/
>> streaming/api/datastream/DataStream.java
>> [4]: https://github.com/apache/flink/blob/master/flink-libraries/flink-
>> streaming-python/src/main/java/org/apache/flink/streaming/python/api/
>> datastream/PythonDataStream.java
>>
>> Thanks, vino.
>>
>>
>> 2018-07-31 22:59 GMT+08:00 Nicos Maris :
>>
>>> Which java features of flink are not supported by the python API when
>>> writing a pipeline?
>>>
>>
>>


Re: Rest API calls

2018-07-31 Thread yuvraj singh
Hi vino , thanks for the information .
But I was looking for the use case where I need to call a web service on
the stream .

Thanks
Yubraj Singh

On Wed, Aug 1, 2018, 8:32 AM vino yang  wrote:

> Hi yuvraj,
>
> The documentation of Flink REST API is here :
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/rest_api.html#monitoring-rest-api
>
> Thanks, vino.
>
> 2018-08-01 3:29 GMT+08:00 yuvraj singh <19yuvrajsing...@gmail.com>:
>
>> Hi I have a use case where I need to call rest apis from a flink . I am
>> not getting much context form internet , please help me on this .
>>
>> Thanks
>>
>>
>


Re: Converting a DataStream into a Table throws error

2018-07-31 Thread vino yang
Hi Mich,

The field specified by the fromDataStream API must match the number of
fields contained in the DataStream stream object, your DataStream's type is
just a string, example is here.[1]

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#convert-a-datastream-or-dataset-into-a-table

Thanks, vino.

2018-08-01 6:16 GMT+08:00 Mich Talebzadeh :

> Hi,
>
> I am following this example
>
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.5/dev/table/common.html#integration-with-
> datastream-and-dataset-api
>
> This is my dataStream which is built on a Kafka topic
>
>//
> //Create a Kafka consumer
> //
> val dataStream =  streamExecEnv
>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new
> SimpleStringSchema(), properties))
>  //
>  //
>   val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>   val table1: Table = tableEnv.fromDataStream(dataStream, 'key, 'ticker,
> 'timeissued, 'price)
>
> While compiling it throws this error
>
> [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/
> myPackage/md_streaming.scala:169: overloaded method value fromDataStream
> with alternatives:
> [error]   [T](dataStream: 
> org.apache.flink.streaming.api.datastream.DataStream[T],
> fields: String)org.apache.flink.table.api.Table 
> [error]   [T](dataStream: org.apache.flink.streaming.
> api.datastream.DataStream[T])org.apache.flink.table.api.Table
> [error]  cannot be applied to 
> (org.apache.flink.streaming.api.datastream.DataStreamSource[String],
> Symbol, Symbol, Symbol, Symbol)
> [error]   val table1: Table = tableEnv.fromDataStream(dataStream, 'key,
> 'ticker, 'timeissued, 'price)
> [error]^
> [error] one error found
> [error] (compile:compileIncremental) Compilation failed
>
> The topic is very simple, it is comma separated prices. I tried
> mapFunction and flatMap but neither worked!
>
> Thanks,
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>