Re: Event time didn't advance because of some idle slots

2018-07-30 Thread Reza Sameei
It's not a real solution; but why you don't change the parallelism for your
`SourceFunction`?

On Tue, Jul 31, 2018 at 10:51 AM Soheil Pourbafrani 
wrote:

> In Flink Event time mode, I use the periodic watermark to advance event
> time. Every slot extract event time from the incoming message and to emit
> watermark, subtract it a network delay, say 3000ms.
>
> public Watermark getCurrentWatermark() {
> return new Watermark(MAX_TIMESTAMP - DELEY);
> }
>
> I have 4 active slots. The problem is just two slots get incoming data but
> all of them call the method getCurrentWatermark(). So in this situation
> consider a case that thread 1 and 2 get incoming data and thread 3 and 4
> will not.
>
> Thread-1-watermark ---> 1541217659806
> Thread-2-watermark ---> 1541217659810
> Thread-3-watermark ---> (0 - 3000 = -3000)
> Thread-4-watermark ---> (0 - 3000 = -3000)
>
> So as Flink set the lowest watermark as the general watermark, time
> doesn't go on! If I change the getCurrentWatermark() method as:
>
> public Watermark getCurrentWatermark() {
> return new Watermark(System.currentTimeMillis() - DELEY);
> }
>
> it will solve the problem, but I don't want to use machine's timestamp!
> How can I fix the problem?
>
>

-- 
رضا سامعی  | Reza Sameei | Software Developer | 09126662695


Event time didn't advance because of some idle slots

2018-07-30 Thread Soheil Pourbafrani
In Flink Event time mode, I use the periodic watermark to advance event
time. Every slot extract event time from the incoming message and to emit
watermark, subtract it a network delay, say 3000ms.

public Watermark getCurrentWatermark() {
return new Watermark(MAX_TIMESTAMP - DELEY);
}

I have 4 active slots. The problem is just two slots get incoming data but
all of them call the method getCurrentWatermark(). So in this situation
consider a case that thread 1 and 2 get incoming data and thread 3 and 4
will not.

Thread-1-watermark ---> 1541217659806
Thread-2-watermark ---> 1541217659810
Thread-3-watermark ---> (0 - 3000 = -3000)
Thread-4-watermark ---> (0 - 3000 = -3000)

So as Flink set the lowest watermark as the general watermark, time doesn't
go on! If I change the getCurrentWatermark() method as:

public Watermark getCurrentWatermark() {
return new Watermark(System.currentTimeMillis() - DELEY);
}

it will solve the problem, but I don't want to use machine's timestamp! How
can I fix the problem?


Re: Detect late data in processing time

2018-07-30 Thread vino yang
Hi Averell,

I personally don't recommend this.
In fact, Processing Time uses the local physical clock of the node where
the specific task is located, rather than setting it upstream in advance.
This is a bit like another time concept provided by Flink - Ingestion Time.
So, If you do not specify to use even time, then do not set watermark.

Thanks, vino.

2018-07-31 12:03 GMT+08:00 Averell :

> Hi Soheil,
>
> Why don't you just use the processing time as event time? Simply overriding
> extractTimestamp to return your processing time.
>
> Regards,
> Averell
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Small-files source - partitioning based on prefix of file

2018-07-30 Thread vino yang
Hi Averell,

Actually, Performing a key partition inside the Source Function is the same
as DataStream[Source].keyBy(cumstom partitioner), because keyBy is not a
real operator, but a virtual node in a DAG, which does not correspond to a
physical operator.

Thanks, vino.

2018-07-31 10:52 GMT+08:00 Averell :

> Hi Vino,
>
> I'm a little bit confused.
> If I can do the partitioning from within the source function, using the
> same
> hash function on the key to identify the partition, would that be
> sufficient
> to avoid shuffling in the next byKey call?
>
> Thanks.
> Averell
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Detect late data in processing time

2018-07-30 Thread Averell
Hi Soheil,

Why don't you just use the processing time as event time? Simply overriding
extractTimestamp to return your processing time.

Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: flink_taskmanager_job_task_operator_records_lag_max == -Inf on Flink 1.4.2

2018-07-30 Thread Tony Wei
Hi Juilo,

As Gordon said, the `records_lag_max` metric is a Kafka-shipped metric [1].
And I also found this thread [2] in Kafka mailing list. It seems that it is
the
design inner Kafka. So I think there is nothing we can do in
Flink-Kafka-Connector.

BTW, the Kafka document [1] said `records_lag_max` is the maximum lag
in terms of number of records for any partition in this "window". I'm not
sure
what this "window" means and if it is configurable. If it is configurable,
then
you can directly pass the config argument to Flink-Kafka-Connector to set
kafka consumer.

Best,
Tony Wei

[1] https://docs.confluent.io/current/kafka/monitoring.html#fetch-metrics
[2] https://lists.apache.org/thread.html/92475e5eb0c1a5fd08e49c30b3fef4
5213b8626e8fea8d52993c0d8c@%3Cusers.kafka.apache.org%3E

2018-07-31 1:36 GMT+08:00 Julio Biason :

> Hey Gordon,
>
> (Reviving this long thread) I think I found part of the problem: It seems
> the metric is capturing the lag from time to time and reseting the value
> in-between. I managed to replicate this attaching a SQL Sink
> (JDBCOutputFormat) connecting to an outside database -- something that took
> about 2 minutes to write 500 records.
>
> I opened the ticket https://issues.apache.org/jira/browse/FLINK-9998 with
> a bit more information about this ('cause I completely forgot to open a
> ticket a month ago about this).
>
> On Thu, Jun 14, 2018 at 11:31 AM, Julio Biason 
> wrote:
>
>> Hey Gordon,
>>
>> The job restarted somewhere in the middle of the night (I haven't checked
>> why yet) and now I have this weird status of the first TaskManager with
>> only one valid lag, the second with 2 and the third with none.
>>
>> I dunno if I could see the partition in the logs, but all "numRecordsOut"
>> are increasing over time (attached the screenshot of the graphs).
>>
>> On Thu, Jun 14, 2018 at 5:27 AM, Tzu-Li (Gordon) Tai > > wrote:
>>
>>> Hi,
>>>
>>> Thanks for the extra information. So, there seems to be 2 separate
>>> issues here. I’ll go through them one by one.
>>>
>>> I was also checking our Grafana and the metric we were using was
>>> "flink_taskmanager_job_task_operator_KafkaConsumer_records_lag_max",
>>> actually. "flink_taskmanager_job_task_operator_records_lag_max" seems
>>> to be new (with the attempt thingy).
>>>
>>>
>>> After looking at the code changes in FLINK-8419, this unfortunately is a
>>> accidental “break” in the scope of the metric.
>>> In 1.4.0, the Kafka-shipped metrics were exposed under the
>>> “KafkaConsumer” metrics group. After FLINK-8419, this was changed, as you
>>> observed.
>>> In 1.5.0, however, I think the metrics are exposed under both patterns.
>>>
>>> Now, with the fact that some subtasks are returning -Inf for
>>> ‘record-lag-max’:
>>> If I understood the metric semantics correctly, this metric represents
>>> the "max record lag across **partitions subscribed by a Kafka consumer
>>> client**.
>>> So, the only possibility that could think of causing this, is that
>>> either the subtask does not have any partitions assigned to it, or simply
>>> there is a bug with the Kafka client returning this value.
>>>
>>> Is it possible that you verify that all subtasks have a partition
>>> assigned to it? That should be possible by just checking the job status in
>>> the Web UI, and observe the numRecordsOut value for each source subtask.
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>> On 13 June 2018 at 2:05:17 PM, Julio Biason (julio.bia...@azion.com)
>>> wrote:
>>>
>>> Hi Gordon,
>>>
>>> We have Kafka 0.10.1.1 running and use the flink-connector-kafka-0.10
>>> driver.
>>>
>>> There are a bunch of flink_taskmanager_job_task_operator_* metrics,
>>> including some about the committed offset for each partition. It seems I
>>> have 4 different records_lag_max with different attempt_id, though, 3 with
>>> -Inf and 1 with a value -- which will give me some more understand of
>>> Prometheus to extract this properly.
>>>
>>> I was also checking our Grafana and the metric we were using was
>>> "flink_taskmanager_job_task_operator_KafkaConsumer_records_lag_max",
>>> actually. "flink_taskmanager_job_task_operator_records_lag_max" seems
>>> to be new (with the attempt thingy).
>>>
>>> On the "KafkaConsumer" front, but it only has the "commited_offset" for
>>> each partition.
>>>
>>> On Wed, Jun 13, 2018 at 5:41 AM, Tzu-Li (Gordon) Tai <
>>> tzuli...@apache.org> wrote:
>>>
 Hi,

 Which Kafka version are you using?

 AFAIK, the only recent changes to Kafka connector metrics in the 1.4.x
 series would be FLINK-8419 [1].
 The ‘records_lag_max’ metric is a Kafka-shipped metric simply forwarded
 from the internally used Kafka client, so nothing should have been 
 affected.

 Do you see other metrics under the pattern of
 ‘flink_taskmanager_job_task_operator_*’? All Kafka-shipped metrics
 should still follow this pattern.
 If not, could you find the ‘records_lag_max’ metric (or any other
 Kafka-shipped metrics [2]) under the u

Re: Small-files source - partitioning based on prefix of file

2018-07-30 Thread Averell
Hi Vino,

I'm a little bit confused. 
If I can do the partitioning from within the source function, using the same
hash function on the key to identify the partition, would that be sufficient
to avoid shuffling in the next byKey call?

Thanks.
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: splitting DataStream throws error

2018-07-30 Thread vino yang
Answered Mich privately, copy here:

*Hi Mich,*

*The use of Split directly on the stream object is wrong. *
*It is used to split the data in the stream object, not the format of the
stream object data itself. In this scenario, if you want to parse the data,
use the map function only after the source stream object, and parse each
piece of data in it. *
*Of course, you can essentially customize the SourceFunction and parse it
directly when you consume it, but you have already used the Kafka Consumer
provided by Flink, which does not provide this functionality. *

*So I suggest you do it with MapFunction.*

*Thanks, vino.*


2018-07-31 5:21 GMT+08:00 Mich Talebzadeh :

> Thanks
>
> So the assumption is that one cannot perform split on DataStream[String]
> directly?
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 30 Jul 2018 at 21:54, Chesnay Schepler  wrote:
>
>> You define a flatMap function that takes a string, calls String#split on
>> it and collects the array.
>>
>> On 30.07.2018 22:04, Mich Talebzadeh wrote:
>>
>>
>> Hi,
>>
>> I have Kafka streaming feeds where a row looks like below where fields
>> are separated by ","
>> I can split them easily with split function
>>
>> scala> val oneline = "05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-
>> 30T19:51:50,190.48"
>> oneline: String = 05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-
>> 30T19:51:50,190.48
>> scala> oneline.split(",")
>> res26: Array[String] = Array(05521df6-4ccf-4b2f-b874-eb27d461b305, IBM,
>> 2018-07-30T19:51:50, 190.48)
>>
>> I can get the individual columns as below
>>
>> scala>val key = oneline.split(",").map(_.trim)
>> .view(0).toString
>> key: String = 05521df6-4ccf-4b2f-b874-eb27d461b305
>> scala>val key = oneline.split(",").map(_.trim)
>> .view(1).toString
>> key: String = IBM
>> scala>val key = oneline.split(",").map(_.trim)
>> .view(2).toString
>> key: String = 2018-07-30T19:51:50
>> scala>val key = oneline.split(",").map(_.trim)
>> .view(3).toFloat
>> key: Float = 190.48
>>
>> Now when I apply the same to dataStream in flink it fails
>>
>> val dataStream =  streamExecEnv
>>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new
>> SimpleStringSchema(), properties))
>>
>>
>> *dataStream.split(",") *
>> [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/
>> myPackage/md_streaming.scala:154: type mismatch;
>> [error]  found   : String(",")
>> [error]  required: org.apache.flink.streaming.api.collector.selector.
>> OutputSelector[String]
>> [error] dataStream.split(",")
>> [error]  ^
>> [error] one error found
>> [error] (compile:compileIncremental) Compilation failed
>>
>> What operation do I need to do on dataStream to make this split work?
>>
>> Thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>>


Re: scala IT

2018-07-30 Thread vino yang
Hi Nicos,

The thrown exception has given you a clear solution hint:
The return type of function 'apply(Mu
   ltiplyByTwoTest.scala:43)' could not be determined automatically, due to
type erasure. You can giv
   e type information hints by using the returns(...) method on the result
of the transformation call
   , or by letting your function implement the 'ResultTypeQueryable'
interface.

You can consider the second option. More information on the Flink type
system can be found in the official documentation[1].

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/types_serialization.html#type-information-in-the-scala-api

Thanks, vino.

2018-07-31 6:30 GMT+08:00 Nicos Maris :

> Hi all,
>
>
> the integration test in scala documented at the testing section fails:
>
> https://travis-ci.org/nicosmaris/HelloDockerScalaSbt/builds/410075764
>
> In previous commits of my demo repo, I tried typextractor , basictypeinfo
> and resultTypeQuerable with no success. I am new to flink and to Scala and
> I would like to have both, not flink with Java.
>
> Am I doing something that is fundamentally wrong?
>
>
> thanks,
> Nicos Maris
>


Re: scala IT

2018-07-30 Thread Hequn Cheng
Hi Nicos,

It is weird. Have you updated your code? I check your code and the function
has implemented ResultTypeQueryable. The code should works well.

Best, Hequn

On Tue, Jul 31, 2018 at 6:30 AM, Nicos Maris  wrote:

> Hi all,
>
>
> the integration test in scala documented at the testing section fails:
>
> https://travis-ci.org/nicosmaris/HelloDockerScalaSbt/builds/410075764
>
> In previous commits of my demo repo, I tried typextractor , basictypeinfo
> and resultTypeQuerable with no success. I am new to flink and to Scala and
> I would like to have both, not flink with Java.
>
> Am I doing something that is fundamentally wrong?
>
>
> thanks,
> Nicos Maris
>


Re: Flink on Mesos: containers question

2018-07-30 Thread Renjie Liu
Hi:
As I said the docker process and job manager process are the same one.

To start task manager in docker, you need to specify in the job master
config "mesos.resourcemanager.tasks.container.type" to "docker", otherwise
flink will just start task manager as processes.

I don't understand what do you mean that you can't access vm's filesystem.

On Tue, Jul 31, 2018 at 2:25 AM NEKRASSOV, ALEXEI  wrote:

> Renjie,
>
>
>
> In my observation Task Managers don’t run in Docker containers – they run
> as JVM processes directly on the VM.
>
> The only Docker container is the one that runs Job Manager.
>
>
>
> What am I missing?
>
>
>
> Thanks,
>
> Alex
>
>
>
> *From:* Renjie Liu [mailto:liurenjie2...@gmail.com]
> *Sent:* Friday, July 20, 2018 8:56 PM
> *To:* Till Rohrmann 
> *Cc:* NEKRASSOV, ALEXEI ; Fabian Hueske ;
> user 
>
>
> *Subject:* Re: Flink on Mesos: containers question
>
>
>
> Hi, Alexei:
>
>
>
> What you paste is expected behavior. Jobmanager, two task managers each
> should run in a docker instance.
>
>
>
> 13276 is should be the process of job manager, and it's the same process
> as 789. They have different processes id because in show them in
> different namesapces(that's a concept in cgroup, which docker actually
> dependens on).
>
>
>
> On Thu, Jul 19, 2018 at 10:00 PM Till Rohrmann 
> wrote:
>
> Hi Alexei,
>
>
>
> I actually never used Mesos with container images. I always used it in a
> way where the Mesos task directly starts the Java process.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Thu, Jul 19, 2018 at 2:44 PM NEKRASSOV, ALEXEI  wrote:
>
> Till,
>
>
>
> Any insight into how Flink components are containerized in Mesos?
>
>
>
> Thanks!
>
> Alex
>
>
>
> *From:* Fabian Hueske [mailto:fhue...@gmail.com]
> *Sent:* Monday, July 16, 2018 7:57 AM
> *To:* NEKRASSOV, ALEXEI 
> *Cc:* user@flink.apache.org; Till Rohrmann 
> *Subject:* Re: Flink on Mesos: containers question
>
>
>
> Hi Alexei,
>
>
>
> Till (in CC) is familiar with Flink's Mesos support in 1.4.x.
>
>
>
> Best, Fabian
>
>
>
> 2018-07-13 15:07 GMT+02:00 NEKRASSOV, ALEXEI :
>
> Can someone please clarify how Flink on Mesos in containerized?
>
>
>
> On 5-node Mesos cluster I started Flink (1.4.2) with two Task Managers.
> Mesos shows “flink” task and two “taskmanager” tasks, all on the same VM.
>
> On that VM I see one Docker container running a process that seems to be
> Mesos App Master:
>
>
>
> $ docker ps -a
>
> CONTAINER IDIMAGE
> COMMAND  CREATED STATUS
> PORTS   NAMES
>
> 97b6840466c0mesosphere/dcos-flink:1.4.2-1.0   "/bin/sh -c
> /sbin/..."   41 hours agoUp 41 hours
> mesos-a0079d85-9ccb-4c43-8d31-e6b1ad750197
>
> $ docker exec 97b6840466c0 /bin/ps -efww
>
> UIDPID  PPID  C STIME TTY  TIME CMD
>
> root 1 0  0 Jul11 ?00:00:00 /bin/sh -c /sbin/init.sh
>
> root 7 1  0 Jul11 ?00:00:02 runsvdir -P /etc/service
>
> root 8 7  0 Jul11 ?00:00:00 runsv flink
>
> root   629 0  0 Jul12 pts/000:00:00 /bin/bash
>
> root   789 8  1 Jul12 ?00:09:16
> /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -classpath
> /flink-1.4.2/lib/flink-python_2.11-1.4.2.jar:/flink-1.4.2/lib/flink-shaded-hadoop2-uber-1.4.2.jar:/flink-1.4.2/lib/log4j-1.2.17.jar:/flink-1.4.2/lib/slf4j-log4j12-1.7.7.jar:/flink-1.4.2/lib/flink-dist_2.11-1.4.2.jar::/etc/hadoop/conf/:
> -Dlog.file=/mnt/mesos/sandbox/flink--mesos-appmaster-alex-tfc87d-private-agents-3.novalocal.log
> -Dlog4j.configuration=file:/flink-1.4.2/conf/log4j.properties
> -Dlogback.configurationFile=file:/flink-1.4.2/conf/logback.xml
> org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner
> -Dblob.server.port=23170 -Djobmanager.heap.mb=256
> -Djobmanager.rpc.port=23169 -Djobmanager.web.port=23168
> -Dmesos.artifact-server.port=23171 -Dmesos.initial-tasks=2
> -Dmesos.resourcemanager.tasks.cpus=2 -Dmesos.resourcemanager.tasks.mem=2048
> -Dtaskmanager.heap.mb=512 -Dtaskmanager.memory.preallocate=true
> -Dtaskmanager.numberOfTaskSlots=1 -Dparallelism.default=1
> -Djobmanager.rpc.address=localhost -Dmesos.resourcemanager.framework.role=*
> -Dsecurity.kerberos.login.use-ticket-cache=true
>
> root  1027 0  0 12:54 ?00:00:00 /bin/ps -efww
>
>
>
> Then on the VM itself I see another process with the same command line as
> the one in the container:
>
>
>
> root 13276  9689  1 Jul12 ?00:09:18
> /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -classpath /flink
> -1.4.2/lib/flink-python_2.11-1.4.2.jar:/flink-1.4.2/lib/flink
> -shaded-hadoop2-uber-1.4.2.jar:/flink-1.4.2/lib/log4j-1.2.17.jar:/flink
> -1.4.2/lib/slf4j-log4j12-1.7.7.jar:/flink-1.4.2/lib/flink-dist_2.11-1.4.2.jar::/etc/hadoop/conf/:
> -Dlog.file=/mnt/mesos/sandbox/flink--mesos-appmaster-alex-tfc87d-private-agents-3.novalocal.log
> -Dlog4j.configuration=file:/flink-1.4.2/conf/log4j.properties
> -Dlogback.configurationFile=file:/flink-1.4.2/conf/logback.xml org.apac

Re: Small-files source - partitioning based on prefix of file

2018-07-30 Thread vino yang
Hi Averell,

The keyBy transformation will trigger the key partition, which is one of
the various partition types supported by Flink, which causes the data to be
shuffled.
It routes the keys of the same hash value to the same node based on the
hash of the key you passed (or generated by the custom partitioner).

Thanks, vino.

2018-07-31 9:33 GMT+08:00 Averell :

> Oh, Thank you Vino. I was not aware of that reshuffling after every custom
> partitioning. Why would that needed though?
>
> Thanks and regards,
> Averell
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Small-files source - partitioning based on prefix of file

2018-07-30 Thread Averell
Oh, Thank you Vino. I was not aware of that reshuffling after every custom
partitioning. Why would that needed though?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Counting elements that appear "behind" the watermark

2018-07-30 Thread Elias Levy
You can create a ProcessFunction.  That gives you access to
getRuntimeContext to register metrics, to the element timestamp, and the
current watermark.  Keep in mind that operators first process a record and
then process any watermark that was the result of that record, so that when
you get the current watermark from within the processElement method, the
watermark generated from that element won't be the current watermark.

On Mon, Jul 30, 2018 at 10:33 AM Julio Biason 
wrote:

> Hello,
>
> Our current watermark model is "some time behind the most recent seen
> element" (very close to what the docs have in "Periodic Watermark"
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html#with-periodic-watermarks).
> It fits our current processing model.
>
> The thing is, we want to extract information about elements appearing
> behind the watermark, to give some insight when we need to update the
> amount of time behind the most seen element we need. The problem is, I
> can't create any metrics inside the AssignerWithPeriodicWatermarks 'cause
> it has no `getRuntime()` to attach the metric.
>
> Is there any way we can count those (a ProcessFunction before the
> .assignTimestampsAndWatermarks(), maybe)?
>
> --
> *Julio Biason*, Sofware Engineer
> *AZION*  |  Deliver. Accelerate. Protect.
> Office: +55 51 3083 8101   |  Mobile: +55 51
> *99907 0554*
>


Re: Counting elements that appear "behind" the watermark

2018-07-30 Thread Hequn Cheng
Hi Julio,

If I understand correctly, you want to adjust your watermarks automatically?
It is true that there are no direct ways to get metric from the
AssignerWithPeriodicWatermarks. Adding ProcessFunction before
assignTimestampsAndWatermarks seems a solution. In the ProcessFunction, you
can count the late number and send the number to the downstream
assignTimestampsAndWatermarks to adjust watermarks.

Best, Hequn

On Tue, Jul 31, 2018 at 1:32 AM, Julio Biason 
wrote:

> Hello,
>
> Our current watermark model is "some time behind the most recent seen
> element" (very close to what the docs have in "Periodic Watermark"
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_
> timestamps_watermarks.html#with-periodic-watermarks). It fits our current
> processing model.
>
> The thing is, we want to extract information about elements appearing
> behind the watermark, to give some insight when we need to update the
> amount of time behind the most seen element we need. The problem is, I
> can't create any metrics inside the AssignerWithPeriodicWatermarks 'cause
> it has no `getRuntime()` to attach the metric.
>
> Is there any way we can count those (a ProcessFunction before the .
> assignTimestampsAndWatermarks(), maybe)?
>
> --
> *Julio Biason*, Sofware Engineer
> *AZION*  |  Deliver. Accelerate. Protect.
> Office: +55 51 3083 8101   |  Mobile: +55 51
> *99907 0554*
>


Re: Description of Flink event time processing

2018-07-30 Thread Elias Levy
Fabian,

You have any time to review the changes?

On Thu, Jul 19, 2018 at 2:19 AM Fabian Hueske  wrote:

> Hi Elias,
>
> Thanks for the update!
> I'll try to have another look soon.
>
> Best, Fabian
>
> 2018-07-11 1:30 GMT+02:00 Elias Levy :
>
>> Thanks for all the comments.  I've updated the document to account for
>> the feedback.  Please take a look.
>>
>> On Fri, Jul 6, 2018 at 2:33 PM Elias Levy 
>> wrote:
>>
>>> Apologies.  Comments are now enabled.
>>>
>>> On Thu, Jul 5, 2018 at 6:09 PM Rong Rong  wrote:
>>>
 Hi Elias,

 Thanks for putting together the document. This is actually a very good,
 well-rounded document.
 I think you did not to enable access for comments for the link. Would
 you mind enabling comments for the google doc?

 Thanks,
 Rong


 On Thu, Jul 5, 2018 at 8:39 AM Fabian Hueske  wrote:

> Hi Elias,
>
> Thanks for the great document!
> I made a pass over it and left a few comments.
>
> I think we should definitely add this to the documentation.
>
> Thanks,
> Fabian
>
> 2018-07-04 10:30 GMT+02:00 Fabian Hueske :
>
>> Hi Elias,
>>
>> I agree, the docs lack a coherent discussion of event time features.
>> Thank you for this write up!
>> I just skimmed your document and will provide more detailed feedback
>> later.
>>
>> It would be great to add such a page to the documentation.
>>
>> Best, Fabian
>>
>> 2018-07-03 3:07 GMT+02:00 Elias Levy :
>>
>>> The documentation of how Flink handles event time and watermarks is
>>> spread across several places.  I've been wanting a single location that
>>> summarizes the subject, and as none was available, I wrote one up.
>>>
>>> You can find it here:
>>> https://docs.google.com/document/d/1b5d-hTdJQsPH3YD0zTB4ZqodinZVHFomKvt41FfUPMc/edit?usp=sharing
>>>
>>> I'd appreciate feedback, particularly about the correctness of the
>>> described behavior.
>>>
>>
>>
>
>


scala IT

2018-07-30 Thread Nicos Maris
Hi all,


the integration test in scala documented at the testing section fails:

https://travis-ci.org/nicosmaris/HelloDockerScalaSbt/builds/410075764

In previous commits of my demo repo, I tried typextractor , basictypeinfo
and resultTypeQuerable with no success. I am new to flink and to Scala and
I would like to have both, not flink with Java.

Am I doing something that is fundamentally wrong?


thanks,
Nicos Maris


Re: splitting DataStream throws error

2018-07-30 Thread Mich Talebzadeh
Thanks

So the assumption is that one cannot perform split on DataStream[String]
directly?

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 30 Jul 2018 at 21:54, Chesnay Schepler  wrote:

> You define a flatMap function that takes a string, calls String#split on
> it and collects the array.
>
> On 30.07.2018 22:04, Mich Talebzadeh wrote:
>
>
> Hi,
>
> I have Kafka streaming feeds where a row looks like below where fields are
> separated by ","
> I can split them easily with split function
>
> scala> val oneline =
> "05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-30T19:51:50,190.48"
> oneline: String =
> 05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-30T19:51:50,190.48
> scala> oneline.split(",")
> res26: Array[String] = Array(05521df6-4ccf-4b2f-b874-eb27d461b305, IBM,
> 2018-07-30T19:51:50, 190.48)
>
> I can get the individual columns as below
>
> scala>val key = oneline.split(",").map(_.trim).view(0).toString
> key: String = 05521df6-4ccf-4b2f-b874-eb27d461b305
> scala>val key = oneline.split(",").map(_.trim).view(1).toString
> key: String = IBM
> scala>val key = oneline.split(",").map(_.trim).view(2).toString
> key: String = 2018-07-30T19:51:50
> scala>val key = oneline.split(",").map(_.trim).view(3).toFloat
> key: Float = 190.48
>
> Now when I apply the same to dataStream in flink it fails
>
> val dataStream =  streamExecEnv
>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new
> SimpleStringSchema(), properties))
>
>
> *dataStream.split(",") *
> [error]
> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:154:
> type mismatch;
> [error]  found   : String(",")
> [error]  required:
> org.apache.flink.streaming.api.collector.selector.OutputSelector[String]
> [error] dataStream.split(",")
> [error]  ^
> [error] one error found
> [error] (compile:compileIncremental) Compilation failed
>
> What operation do I need to do on dataStream to make this split work?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>


Re: splitting DataStream throws error

2018-07-30 Thread Chesnay Schepler
You define a flatMap function that takes a string, calls String#split on 
it and collects the array.


On 30.07.2018 22:04, Mich Talebzadeh wrote:


Hi,

I have Kafka streaming feeds where a row looks like below where fields 
are separated by ","

I can split them easily with split function

scala> val oneline = 
"05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-30T19:51:50,190.48"
oneline: String = 
05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-30T19:51:50,190.48

scala> oneline.split(",")
res26: Array[String] = Array(05521df6-4ccf-4b2f-b874-eb27d461b305, 
IBM, 2018-07-30T19:51:50, 190.48)


I can get the individual columns as below

scala>val key = 
oneline.split(",").map(_.trim).view(0).toString

key: String = 05521df6-4ccf-4b2f-b874-eb27d461b305
scala>val key = 
oneline.split(",").map(_.trim).view(1).toString

key: String = IBM
scala>val key = 
oneline.split(",").map(_.trim).view(2).toString

key: String = 2018-07-30T19:51:50
scala>val key = oneline.split(",").map(_.trim).view(3).toFloat
key: Float = 190.48

Now when I apply the same to dataStream in flink it fails

val dataStream =  streamExecEnv
.addSource(new FlinkKafkaConsumer011[String](topicsValue, new 
SimpleStringSchema(), properties))


*dataStream.split(",")
*
[error] 
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:154: 
type mismatch;

[error]  found   : String(",")
[error]  required: 
org.apache.flink.streaming.api.collector.selector.OutputSelector[String]

[error] dataStream.split(",")
[error]  ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed

What operation do I need to do on dataStream to make this split work?

Thanks

Dr Mich Talebzadeh

LinkedIn 
/https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw/


http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk.Any and all responsibility for 
any loss, damage or destruction of data or any other property which 
may arise from relying on this email's technical content is explicitly 
disclaimed. The author will in no case be liable for any monetary 
damages arising from such loss, damage or destruction.






Re: AM Delegation Token Regeneration

2018-07-30 Thread Shuyi Chen
Hi Paul, currently, Flink intentionally disables DT and only use keytab. I
am not aware that DT regeneration is part of FLIP-6 (@till, correct me if I
am wrong). I've created a security improvement design

to
document some of the changes we can make to improve flink's security
framework, it will be great if you can take a look and let us know what you
think. Thanks a lot.

Shuyi

On Mon, Jul 30, 2018 at 4:58 AM Paul Lam  wrote:

> Hi,
> At present, Flink distribute keytabs via YARN to the nodes that is running
> a Flink job, and this might be a potential security problem. I’ve read
> FLINK-3670 and the corresponding mail list discussions, and I think a more
> appropriate implementation would be like Spark’s: regenerate delegation
> tokens in AM and the containers just get the generated delegation token
> instead of the whole keytab. Also, I noticed that Dispatcher was introduced
> in FLIP-6 and one of its functionality is acquiring user’s authentication
> tokens. So, my question is, is delegation token regeneration part of
> FLIP-6? If not, would it be supported in the future?
>
> Best regards,
> Paul Lam



-- 
"So you have to trust that the dots will somehow connect in your future."


splitting DataStream throws error

2018-07-30 Thread Mich Talebzadeh
Hi,

I have Kafka streaming feeds where a row looks like below where fields are
separated by ","
I can split them easily with split function

scala> val oneline =
"05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-30T19:51:50,190.48"
oneline: String =
05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-30T19:51:50,190.48
scala> oneline.split(",")
res26: Array[String] = Array(05521df6-4ccf-4b2f-b874-eb27d461b305, IBM,
2018-07-30T19:51:50, 190.48)

I can get the individual columns as below

scala>val key = oneline.split(",").map(_.trim).view(0).toString
key: String = 05521df6-4ccf-4b2f-b874-eb27d461b305
scala>val key = oneline.split(",").map(_.trim).view(1).toString
key: String = IBM
scala>val key = oneline.split(",").map(_.trim).view(2).toString
key: String = 2018-07-30T19:51:50
scala>val key = oneline.split(",").map(_.trim).view(3).toFloat
key: Float = 190.48

Now when I apply the same to dataStream in flink it fails

val dataStream =  streamExecEnv
   .addSource(new FlinkKafkaConsumer011[String](topicsValue, new
SimpleStringSchema(), properties))


*dataStream.split(",")*
[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:154:
type mismatch;
[error]  found   : String(",")
[error]  required:
org.apache.flink.streaming.api.collector.selector.OutputSelector[String]
[error] dataStream.split(",")
[error]  ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed

What operation do I need to do on dataStream to make this split work?

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


RE: Flink on Mesos: containers question

2018-07-30 Thread NEKRASSOV, ALEXEI
Renjie,

In my observation Task Managers don’t run in Docker containers – they run as 
JVM processes directly on the VM.
The only Docker container is the one that runs Job Manager.

What am I missing?

Thanks,
Alex

From: Renjie Liu [mailto:liurenjie2...@gmail.com]
Sent: Friday, July 20, 2018 8:56 PM
To: Till Rohrmann 
Cc: NEKRASSOV, ALEXEI ; Fabian Hueske ; user 

Subject: Re: Flink on Mesos: containers question

Hi, Alexei:

What you paste is expected behavior. Jobmanager, two task managers each should 
run in a docker instance.

13276 is should be the process of job manager, and it's the same process as 
789. They have different processes id because in show them in different 
namesapces(that's a concept in cgroup, which docker actually dependens on).

On Thu, Jul 19, 2018 at 10:00 PM Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:
Hi Alexei,

I actually never used Mesos with container images. I always used it in a way 
where the Mesos task directly starts the Java process.

Cheers,
Till

On Thu, Jul 19, 2018 at 2:44 PM NEKRASSOV, ALEXEI 
mailto:an4...@att.com>> wrote:
Till,

Any insight into how Flink components are containerized in Mesos?

Thanks!
Alex

From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: Monday, July 16, 2018 7:57 AM
To: NEKRASSOV, ALEXEI mailto:an4...@att.com>>
Cc: user@flink.apache.org; Till Rohrmann 
mailto:trohrm...@apache.org>>
Subject: Re: Flink on Mesos: containers question

Hi Alexei,

Till (in CC) is familiar with Flink's Mesos support in 1.4.x.

Best, Fabian

2018-07-13 15:07 GMT+02:00 NEKRASSOV, ALEXEI 
mailto:an4...@att.com>>:
Can someone please clarify how Flink on Mesos in containerized?

On 5-node Mesos cluster I started Flink (1.4.2) with two Task Managers. Mesos 
shows “flink” task and two “taskmanager” tasks, all on the same VM.
On that VM I see one Docker container running a process that seems to be Mesos 
App Master:

$ docker ps -a
CONTAINER IDIMAGE COMMAND  
CREATED STATUS  PORTS   NAMES
97b6840466c0mesosphere/dcos-flink:1.4.2-1.0   "/bin/sh -c /sbin/..."   
41 hours agoUp 41 hours 
mesos-a0079d85-9ccb-4c43-8d31-e6b1ad750197
$ docker exec 97b6840466c0 /bin/ps -efww
UIDPID  PPID  C STIME TTY  TIME CMD
root 1 0  0 Jul11 ?00:00:00 /bin/sh -c /sbin/init.sh
root 7 1  0 Jul11 ?00:00:02 runsvdir -P /etc/service
root 8 7  0 Jul11 ?00:00:00 runsv flink
root   629 0  0 Jul12 pts/000:00:00 /bin/bash
root   789 8  1 Jul12 ?00:09:16 
/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -classpath 
/flink-1.4.2/lib/flink-python_2.11-1.4.2.jar:/flink-1.4.2/lib/flink-shaded-hadoop2-uber-1.4.2.jar:/flink-1.4.2/lib/log4j-1.2.17.jar:/flink-1.4.2/lib/slf4j-log4j12-1.7.7.jar:/flink-1.4.2/lib/flink-dist_2.11-1.4.2.jar::/etc/hadoop/conf/:
 
-Dlog.file=/mnt/mesos/sandbox/flink--mesos-appmaster-alex-tfc87d-private-agents-3.novalocal.log
 -Dlog4j.configuration=file:/flink-1.4.2/conf/log4j.properties 
-Dlogback.configurationFile=file:/flink-1.4.2/conf/logback.xml 
org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner 
-Dblob.server.port=23170 -Djobmanager.heap.mb=256 -Djobmanager.rpc.port=23169 
-Djobmanager.web.port=23168 -Dmesos.artifact-server.port=23171 
-Dmesos.initial-tasks=2 -Dmesos.resourcemanager.tasks.cpus=2 
-Dmesos.resourcemanager.tasks.mem=2048 -Dtaskmanager.heap.mb=512 
-Dtaskmanager.memory.preallocate=true -Dtaskmanager.numberOfTaskSlots=1 
-Dparallelism.default=1 -Djobmanager.rpc.address=localhost 
-Dmesos.resourcemanager.framework.role=* 
-Dsecurity.kerberos.login.use-ticket-cache=true
root  1027 0  0 12:54 ?00:00:00 /bin/ps -efww

Then on the VM itself I see another process with the same command line as the 
one in the container:

root 13276  9689  1 Jul12 ?00:09:18 
/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -classpath 
/flink-1.4.2/lib/flink-python_2.11-1.4.2.jar:/flink-1.4.2/lib/flink-shaded-hadoop2-uber-1.4.2.jar:/flink-1.4.2/lib/log4j-1.2.17.jar:/flink-1.4.2/lib/slf4j-log4j12-1.7.7.jar:/flink-1.4.2/lib/flink-dist_2.11-1.4.2.jar::/etc/hadoop/conf/:
 
-Dlog.file=/mnt/mesos/sandbox/flink--mesos-appmaster-alex-tfc87d-private-agents-3.novalocal.log
 -Dlog4j.configuration=file:/flink-1.4.2/conf/log4j.properties 
-Dlogback.configurationFile=file:/flink-1.4.2/conf/logback.xml 
org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner 
-Dblob.server.port=23170 -Djobmanager.heap.mb=256 -Djobmanager.rpc.port=23169 
-Djobmanager.web.port=23168 -Dmesos.artifact-server.port=23171 
-Dmesos.initial-tasks=2 -Dmesos.resourcemanager.tasks.cpus=2 
-Dmesos.resourcemanager.tasks.mem=2048 -Dtaskmanager.heap.mb=512 
-Dtaskmanager.memory.preallocate=true -Dtaskmanager.numberOfTaskSlots=1 
-Dparallelism.default=1 -Djobmanager.rpc.addre

Delay in REST/UI readiness during JM recovery

2018-07-30 Thread Joey Echeverria
I’m running Flink 1.5.0 in Kubernetes with HA enabled, but only a single Job 
Manager running. I’m using Zookeeper to store the fencing/leader information 
and S3 to store the job manager state. We’ve been running around 250 or so 
streaming jobs and we’ve noticed that if the job manager pod is deleted, it 
takes something like 20-45 minutes for the job manager’s REST endpoints and web 
UI to become available. Until it becomes available, we get a 503 response from 
the HTTP server with the message "Could not retrieve the redirect address of 
the current leader. Please try to refresh.”.

Has anyone else run into this?

Are there any configuration settings I should be looking at to speed up the 
availability of the HTTP endpoints?

Thanks!

-Joey

Re: flink_taskmanager_job_task_operator_records_lag_max == -Inf on Flink 1.4.2

2018-07-30 Thread Julio Biason
Hey Gordon,

(Reviving this long thread) I think I found part of the problem: It seems
the metric is capturing the lag from time to time and reseting the value
in-between. I managed to replicate this attaching a SQL Sink
(JDBCOutputFormat) connecting to an outside database -- something that took
about 2 minutes to write 500 records.

I opened the ticket https://issues.apache.org/jira/browse/FLINK-9998 with a
bit more information about this ('cause I completely forgot to open a
ticket a month ago about this).

On Thu, Jun 14, 2018 at 11:31 AM, Julio Biason 
wrote:

> Hey Gordon,
>
> The job restarted somewhere in the middle of the night (I haven't checked
> why yet) and now I have this weird status of the first TaskManager with
> only one valid lag, the second with 2 and the third with none.
>
> I dunno if I could see the partition in the logs, but all "numRecordsOut"
> are increasing over time (attached the screenshot of the graphs).
>
> On Thu, Jun 14, 2018 at 5:27 AM, Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi,
>>
>> Thanks for the extra information. So, there seems to be 2 separate issues
>> here. I’ll go through them one by one.
>>
>> I was also checking our Grafana and the metric we were using was
>> "flink_taskmanager_job_task_operator_KafkaConsumer_records_lag_max",
>> actually. "flink_taskmanager_job_task_operator_records_lag_max" seems to
>> be new (with the attempt thingy).
>>
>>
>> After looking at the code changes in FLINK-8419, this unfortunately is a
>> accidental “break” in the scope of the metric.
>> In 1.4.0, the Kafka-shipped metrics were exposed under the
>> “KafkaConsumer” metrics group. After FLINK-8419, this was changed, as you
>> observed.
>> In 1.5.0, however, I think the metrics are exposed under both patterns.
>>
>> Now, with the fact that some subtasks are returning -Inf for
>> ‘record-lag-max’:
>> If I understood the metric semantics correctly, this metric represents
>> the "max record lag across **partitions subscribed by a Kafka consumer
>> client**.
>> So, the only possibility that could think of causing this, is that either
>> the subtask does not have any partitions assigned to it, or simply there is
>> a bug with the Kafka client returning this value.
>>
>> Is it possible that you verify that all subtasks have a partition
>> assigned to it? That should be possible by just checking the job status in
>> the Web UI, and observe the numRecordsOut value for each source subtask.
>>
>> Cheers,
>> Gordon
>>
>>
>> On 13 June 2018 at 2:05:17 PM, Julio Biason (julio.bia...@azion.com)
>> wrote:
>>
>> Hi Gordon,
>>
>> We have Kafka 0.10.1.1 running and use the flink-connector-kafka-0.10
>> driver.
>>
>> There are a bunch of flink_taskmanager_job_task_operator_* metrics,
>> including some about the committed offset for each partition. It seems I
>> have 4 different records_lag_max with different attempt_id, though, 3 with
>> -Inf and 1 with a value -- which will give me some more understand of
>> Prometheus to extract this properly.
>>
>> I was also checking our Grafana and the metric we were using was
>> "flink_taskmanager_job_task_operator_KafkaConsumer_records_lag_max",
>> actually. "flink_taskmanager_job_task_operator_records_lag_max" seems to
>> be new (with the attempt thingy).
>>
>> On the "KafkaConsumer" front, but it only has the "commited_offset" for
>> each partition.
>>
>> On Wed, Jun 13, 2018 at 5:41 AM, Tzu-Li (Gordon) Tai > > wrote:
>>
>>> Hi,
>>>
>>> Which Kafka version are you using?
>>>
>>> AFAIK, the only recent changes to Kafka connector metrics in the 1.4.x
>>> series would be FLINK-8419 [1].
>>> The ‘records_lag_max’ metric is a Kafka-shipped metric simply forwarded
>>> from the internally used Kafka client, so nothing should have been affected.
>>>
>>> Do you see other metrics under the pattern of
>>> ‘flink_taskmanager_job_task_operator_*’? All Kafka-shipped metrics
>>> should still follow this pattern.
>>> If not, could you find the ‘records_lag_max’ metric (or any other
>>> Kafka-shipped metrics [2]) under the user scope ‘KafkaConsumer’?
>>>
>>> The above should provide more insight into what may be wrong here.
>>>
>>> - Gordon
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-8419
>>> [2] https://docs.confluent.io/current/kafka/monitoring.html#
>>> fetch-metrics
>>>
>>> On 12 June 2018 at 11:47:51 PM, Julio Biason (julio.bia...@azion.com)
>>> wrote:
>>>
>>> Hey guys,
>>>
>>> I just updated our Flink install from 1.4.0 to 1.4.2, but our Prometheus
>>> monitoring is not getting the current Kafka lag.
>>>
>>> After updating to 1.4.2 and making the symlink between
>>> opt/flink-metrics-prometheus-1.4.2.jar to lib/, I got the metrics back
>>> on Prometheus, but the most important one, 
>>> flink_taskmanager_job_task_operator_records_lag_max
>>> is now returning -Inf.
>>>
>>> Did I miss something?
>>>
>>> --
>>> *Julio Biason*, Sofware Engineer
>>> *AZION*  |  Deliver. Accelerate. Protect.
>>> Office: +55 51 3083 8101   |  Mobile: +55 51
>>> *99907 0554*
>>>
>>>
>>

Counting elements that appear "behind" the watermark

2018-07-30 Thread Julio Biason
Hello,

Our current watermark model is "some time behind the most recent seen
element" (very close to what the docs have in "Periodic Watermark"
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html#with-periodic-watermarks).
It fits our current processing model.

The thing is, we want to extract information about elements appearing
behind the watermark, to give some insight when we need to update the
amount of time behind the most seen element we need. The problem is, I
can't create any metrics inside the AssignerWithPeriodicWatermarks 'cause
it has no `getRuntime()` to attach the metric.

Is there any way we can count those (a ProcessFunction before the
.assignTimestampsAndWatermarks(), maybe)?

-- 
*Julio Biason*, Sofware Engineer
*AZION*  |  Deliver. Accelerate. Protect.
Office: +55 51 3083 8101   |  Mobile: +55 51
*99907 0554*


Re: flink command line with save point job is not being submitted

2018-07-30 Thread vino yang
Hi Darshan,

This is a known problem with Flink, and no specific exception information
is given, making diagnosis more difficult.
I personally guess that you are using a local file system, which may be the
cause of the problem.
Can you specify a HDFS with access permission for Savepoint?

Thanks, vino.

2018-07-30 23:23 GMT+08:00 Darshan Singh :

> I am trying to submit a job with the savepoint/checkpoint and it is
> failing with below error. Without -s flag it works fine. Am i missing
> something here?
>
>
> Thanks
>
> >bin/flink run -d  -c st -s file:///tmp/db/checkpoint/
> ./target/poc-1.0-SNAPSHOT-jar-with-dependencies.jar
>
> Starting execution of program
>
>
> 
>
>  The program finished with the following exception:
>
>
> org.apache.flink.client.program.ProgramInvocationException: Could not
> submit job c70ab528e98178bb7b9d8c622511e9f5.
>
> at org.apache.flink.client.program.rest.RestClusterClient.submitJob(
> RestClusterClient.java:247)
>
> at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:464)
>
> at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(
> DetachedEnvironment.java:77)
>
> at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:410)
>
> at org.apache.flink.client.cli.CliFrontend.executeProgram(
> CliFrontend.java:785)
>
> at org.apache.flink.client.cli.CliFrontend.runProgram(
> CliFrontend.java:279)
>
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
>
> at org.apache.flink.client.cli.CliFrontend.parseParameters(
> CliFrontend.java:1025)
>
> at org.apache.flink.client.cli.CliFrontend.lambda$main$9(
> CliFrontend.java:1101)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at javax.security.auth.Subject.doAs(Subject.java:422)
>
> at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1836)
>
> at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(
> HadoopSecurityContext.java:41)
>
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101)
>
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit JobGraph.
>
> at org.apache.flink.client.program.rest.RestClusterClient.lambda$
> submitJob$8(RestClusterClient.java:370)
>
> at java.util.concurrent.CompletableFuture.uniExceptionally(
> CompletableFuture.java:870)
>
> at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(
> CompletableFuture.java:852)
>
> at java.util.concurrent.CompletableFuture.postComplete(
> CompletableFuture.java:474)
>
> at java.util.concurrent.CompletableFuture.completeExceptionally(
> CompletableFuture.java:1977)
>
> at org.apache.flink.runtime.concurrent.FutureUtils.lambda$
> retryOperationWithDelay$5(FutureUtils.java:214)
>
> at java.util.concurrent.CompletableFuture.uniWhenComplete(
> CompletableFuture.java:760)
>
> at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(
> CompletableFuture.java:736)
>
> at java.util.concurrent.CompletableFuture.postComplete(
> CompletableFuture.java:474)
>
> at java.util.concurrent.CompletableFuture.postFire(
> CompletableFuture.java:561)
>
> at java.util.concurrent.CompletableFuture$UniCompose.
> tryFire(CompletableFuture.java:929)
>
> at java.util.concurrent.CompletableFuture$Completion.
> run(CompletableFuture.java:442)
>
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149)
>
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:624)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
> complete the operation. Exception is not retryable.
>
> at java.util.concurrent.CompletableFuture.encodeRelay(
> CompletableFuture.java:326)
>
> at java.util.concurrent.CompletableFuture.completeRelay(
> CompletableFuture.java:338)
>
> at java.util.concurrent.CompletableFuture.uniRelay(
> CompletableFuture.java:911)
>
> at java.util.concurrent.CompletableFuture$UniRelay.
> tryFire(CompletableFuture.java:899)
>
> ... 12 more
>
> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException:
> Could not complete the operation. Exception is not retryable.
>
> ... 10 more
>
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.runtime.rest.util.RestClientException: [Job submission
> failed.]
>
> at java.util.concurrent.CompletableFuture.encodeRelay(
> CompletableFuture.java:326)
>
> at java.util.concurrent.CompletableFuture.completeRelay(
> CompletableFuture.java:338)
>
> at java.util.concurrent.CompletableFuture.uniRelay(
> CompletableFuture.java:911)
>
> at java.util.concurrent.CompletableFuture.uniCompose(
> CompletableFuture.java:953)
>
> at java.util.concurrent.CompletableFuture$UniCompose.
> tryFire(CompletableFuture.java:926)
>
> ... 4 more
>
> Caused by: org.apache.flink.runtime.rest.util.RestClientEx

Re: Committing Kafka Transactions during Savepoint

2018-07-30 Thread vino yang
Hi Scott,

For EXACTLY_ONCE in sink end with Kafka 0.11+ producer, The answer is YES.
There is a official documentation you can have a good knowledge of this
topic[1].

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011

Thanks, vino.



2018-07-27 22:53 GMT+08:00 Scott Kidder :

> Thank you, Aljoscha! Are Kafka transactions committed when a running job
> has been instructed to cancel with a savepoint (e.g. `flink cancel -s
> `)? This is my primary use for savepoints. I would expect that when a
> new job is submitted with the savepoint, as in the case of an application
> upgrade, Flink withl create a new Kafka transaction and processing will be
> exactly-once.
>
> --Scott Kidder
>
> On Fri, Jul 27, 2018 at 5:09 AM Aljoscha Krettek 
> wrote:
>
>> Hi,
>>
>> this has been in the back of my head for a while now. I finally created a
>> Jira issue: https://issues.apache.org/jira/browse/FLINK-9983
>>
>> In there, I also outline a better fix that will take a bit longer to
>> implement.
>>
>> Best,
>> Aljoscha
>>
>> On 26. Jul 2018, at 23:04, Scott Kidder  wrote:
>>
>> I recently began using the exactly-once processing semantic with the
>> Kafka 0.11 producer in Flink 1.4.2. It's been working great!
>>
>> Are Kafka transactions committed when creating a Flink savepoint? How
>> does this affect the recovery behavior in Flink if, before the completion
>> of the next checkpoint, the application is restarted and restores from a
>> checkpoint taken before the savepoint? It seems like this might lead to the
>> Kafka producer writing a message multiple times with different committed
>> Kafka transactions.
>>
>> --
>> Scott Kidder
>>
>>
>>


flink command line with save point job is not being submitted

2018-07-30 Thread Darshan Singh
I am trying to submit a job with the savepoint/checkpoint and it is failing
with below error. Without -s flag it works fine. Am i missing something
here?


Thanks

>bin/flink run -d  -c st -s file:///tmp/db/checkpoint/
./target/poc-1.0-SNAPSHOT-jar-with-dependencies.jar

Starting execution of program




 The program finished with the following exception:


org.apache.flink.client.program.ProgramInvocationException: Could not
submit job c70ab528e98178bb7b9d8c622511e9f5.

at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:247)

at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)

at
org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)

at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:410)

at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:785)

at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:279)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)

at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)

at
org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)

at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101)

Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
to submit JobGraph.

at
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:370)

at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)

at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)

at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)

at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)

at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:214)

at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)

at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)

at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)

at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)

at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)

at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.util.concurrent.CompletionException:
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
complete the operation. Exception is not retryable.

at
java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)

at
java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)

at
java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)

at
java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)

... 12 more

Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException:
Could not complete the operation. Exception is not retryable.

... 10 more

Caused by: java.util.concurrent.CompletionException:
org.apache.flink.runtime.rest.util.RestClientException: [Job submission
failed.]

at
java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)

at
java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)

at
java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)

at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:953)

at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)

... 4 more

Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Job
submission failed.]

at
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:309)

at
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:293)

at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)

... 5 more


Re: Detect late data in processing time

2018-07-30 Thread vino yang
Hi Soheil,

Watermark indicates the progress of the Event time. The reason it exists is
because there is a Time skew between Event time and Processing time. Hequn
is correct and Watermark cannot be used for processing time. The processing
time will be based on the TM local system clock. Usually, when there is a
time field in your event that indicates when it actually happened, we will
choose Event time. When we choose Processing time, we don't rely on the
time information carried by the data itself, so the question is how do you
define "bad data".

Thanks, vino.

2018-07-30 22:29 GMT+08:00 Hequn Cheng :

> Hi Soheil,
>
> No, we can't set watermark during processing time.  And there are no late
> data considering processing time window.
> So the problem is what data is bad data when you use processing time?
> Maybe there are other ways to solve your problem.
>
> Best, Hequn
>
> On Mon, Jul 30, 2018 at 8:22 PM, Soheil Pourbafrani  > wrote:
>
>> In Event Time, we can gather bad data using OutputTag, because in Event
>> Time we have Watermark and we can detect late data. But in processing time
>> mode we don't have any watermark to detect bad data. I want to know can we
>> set watermark (for example according to taskmanager's timestamp) and use
>> processing time in creating time windows?
>>
>
>


Re: Small-files source - partitioning based on prefix of file

2018-07-30 Thread vino yang
Hi Averell,

As far as I know, the custom partitioner will inevitably lead to shuffle of
data.
Even if it is bundled in the logic of the source function, isn't the
behavior different?

Thanks, vino.

2018-07-30 20:32 GMT+08:00 Averell :

> Thanks Vino.
>
> Yes, I can do that after the source function. But that means data would be
> shuffled - sending from every source to the right partition.
> I think that by doing the partition from within the file source would help
> to save that shuffling.
>
> Thanks.
> Averell.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Detect late data in processing time

2018-07-30 Thread Hequn Cheng
Hi Soheil,

No, we can't set watermark during processing time.  And there are no late
data considering processing time window.
So the problem is what data is bad data when you use processing time? Maybe
there are other ways to solve your problem.

Best, Hequn

On Mon, Jul 30, 2018 at 8:22 PM, Soheil Pourbafrani 
wrote:

> In Event Time, we can gather bad data using OutputTag, because in Event
> Time we have Watermark and we can detect late data. But in processing time
> mode we don't have any watermark to detect bad data. I want to know can we
> set watermark (for example according to taskmanager's timestamp) and use
> processing time in creating time windows?
>


Re: Order of events in a Keyed Stream

2018-07-30 Thread Harshvardhan Agrawal
Thanks for the response guys.

Based on Niels response, it seems like a keyby immediately after reading
from the source should map all messages with the account number on the same
slot.

On Sun, Jul 29, 2018 at 05:33 Renjie Liu  wrote:

> Hi,
> Another way to ensure order is by adding a logical version number for each
> message so that earlier version will not override later version. Timestamp
> depends on your ntp server works correctly.
>
> On Sun, Jul 29, 2018 at 3:52 PM Niels Basjes  wrote:
>
>> Hi,
>>
>> The basic thing is that you will only get the messages in a guaranteed
>> order if the order is maintained in all steps from creation to use.
>> In Kafka order is only guaranteed for messages in the same partition.
>> So if you need them in order by account then the producing system must
>> use the accountid as the key used to force a specific account into a
>> specific kafka partition.
>> Then the Flink Kafka source will read them sequentially in the right
>> order, but in order to KEEP them in that order you should really to a keyby
>> immediately after reading and used only keyedstreams from that point
>> onwards.
>> As soon as you do shuffle or key by a different key then the ordering
>> within an account is no longer guaranteed.
>>
>> In general I always put a very accurate timestamp in all of my events
>> (epoch milliseconds, in some cases even epoch microseconds) so I can always
>> check if an order problem occurred.
>>
>> Niels Basjes
>>
>> On Sun, Jul 29, 2018 at 9:25 AM, Congxian Qiu 
>> wrote:
>>
>>> Hi,
>>> Maybe the messages of the same key should be in the *same partition* of
>>> Kafka topic
>>>
>>> 2018-07-29 11:01 GMT+08:00 Hequn Cheng :
>>>
 Hi harshvardhan,
 If 1.the messages exist on the same topic and 2.there are no rebalance
 and 3.keyby on the same field with same value, the answer is yes.

 Best, Hequn

 On Sun, Jul 29, 2018 at 3:56 AM, Harshvardhan Agrawal <
 harshvardhan.ag...@gmail.com> wrote:

> Hey,
>
> The messages will exist on the same topic. I intend to keyby on the
> same field. The question is that will the two messages be mapped to the
> same task manager and on the same slot. Also will they be processed in
> correct order given they have the same keys?
>
> On Fri, Jul 27, 2018 at 21:28 Hequn Cheng 
> wrote:
>
>> Hi Harshvardhan,
>>
>> There are a number of factors to consider.
>> 1. the consecutive Kafka messages must exist in a same topic of
>> kafka.
>> 2. the data should not been rebalanced. For example, operators should
>> be chained in order to avoid rebalancing.
>> 3. if you perform keyBy(), you should keyBy on a field the consecutive
>> two messages share the same value.
>>
>> Best, Hequn
>>
>> On Sat, Jul 28, 2018 at 12:11 AM, Harshvardhan Agrawal <
>> harshvardhan.ag...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>>
>>> We are currently using Flink to process financial data. We are
>>> getting position data from Kafka and we enrich the positions with 
>>> account
>>> and product information. We are using Ingestion time while processing
>>> events. The question I have is: say I key the position datasream by 
>>> account
>>> number. If I have two consecutive Kafka messages with the same account 
>>> and
>>> product info where the second one is an updated position of the first 
>>> one,
>>> does Flink guarantee that the messages will be processed on the same 
>>> slot
>>> in the same worker? We want to ensure that we don’t process them out of
>>> order.
>>>
>>> Thank you!
>>> --
>>> Regards,
>>> Harshvardhan
>>>
>>
>> --
> Regards,
> Harshvardhan
>


>>>
>>>
>>> --
>>> Blog:http://www.klion26.com
>>> GTalk:qcx978132955
>>> 一切随心
>>>
>>
>>
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
> --
> Liu, Renjie
> Software Engineer, MVAD
>
-- 
Regards,
Harshvardhan


Re: Small-files source - partitioning based on prefix of file

2018-07-30 Thread Averell
Thanks Vino.

Yes, I can do that after the source function. But that means data would be
shuffled - sending from every source to the right partition.
I think that by doing the partition from within the file source would help
to save that shuffling.

Thanks.
Averell.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Detect late data in processing time

2018-07-30 Thread Soheil Pourbafrani
In Event Time, we can gather bad data using OutputTag, because in Event
Time we have Watermark and we can detect late data. But in processing time
mode we don't have any watermark to detect bad data. I want to know can we
set watermark (for example according to taskmanager's timestamp) and use
processing time in creating time windows?


Re: Small-files source - partitioning based on prefix of file

2018-07-30 Thread vino yang
Hi Averell,

Yes, you can not do it in the source function. I think you can call keyBy
with a partitioner (based on NodeID) after source.
Why do you have to use the customized partitioner in the source function?

Thanks, vino.

2018-07-30 19:56 GMT+08:00 Averell :

> Thank you Vino.
>
> Yes, I went thru that official guide before posting this question. The
> problem was that I could not see any call to one of those mentioned
> partitioning methods (partitionCustom, shuffle, rebalance, rescale, or
> broadcast) in the original readFile function. I'm still trying to look into
> the code.
> There should always be a way to do it, but I hope that you / someone can
> help me with the easiest way - kind of a small customization at the very
> place that "directory monitoring" hands-over the file splits to "file
> reader".
>
> Thanks!
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


AM Delegation Token Regeneration

2018-07-30 Thread Paul Lam
Hi, 
At present, Flink distribute keytabs via YARN to the nodes that is running a 
Flink job, and this might be a potential security problem. I’ve read FLINK-3670 
and the corresponding mail list discussions, and I think a more appropriate 
implementation would be like Spark’s: regenerate delegation tokens in AM and 
the containers just get the generated delegation token instead of the whole 
keytab. Also, I noticed that Dispatcher was introduced in FLIP-6 and one of its 
functionality is acquiring user’s authentication tokens. So, my question is, is 
delegation token regeneration part of FLIP-6? If not, would it be supported in 
the future?

Best regards,
Paul Lam

Re: Small-files source - partitioning based on prefix of file

2018-07-30 Thread Averell
Thank you Vino.

Yes, I went thru that official guide before posting this question. The
problem was that I could not see any call to one of those mentioned
partitioning methods (partitionCustom, shuffle, rebalance, rescale, or
broadcast) in the original readFile function. I'm still trying to look into
the code.
There should always be a way to do it, but I hope that you / someone can
help me with the easiest way - kind of a small customization at the very
place that "directory monitoring" hands-over the file splits to "file
reader".

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Logs are not easy to read through webUI

2018-07-30 Thread vino yang
Hi Xinyu,

Thanks for your suggestion. I will CC this suggestion to some PMC members
of Flink.

Thanks, vino.

2018-07-30 18:03 GMT+08:00 Xinyu Zhang :

> Hi vino
>
> Yes, it's only from the perspective of performance of reading log or
> metrics.  If the logs with timestamps(e.g. jobmanager.log.2018-07-29) will
> never change, maybe blob store can cache some of them to improve
> performance.
>
> BTW, please considering to develop an API for reading logs. I think many
> flink users meet this problem.
>
> Thanks!
>
> Xinyu Zhang
>
>
> 2018年7月30日星期一,vino yang  写道:
>
>> Hi Xinyu,
>>
>> Thank you for your clarification on "periodic reading". If Flink
>> considers developing an API for reading logs, I think this is a good idea.
>>
>> Regarding the problem of TM reading logs, your idea is good from a
>> performance perspective.
>> But Flink didn't provide any web services for the TM from the beginning.
>> All the requests were passed through the JM proxy.
>> Just because of the log read performance changes, there will be major
>> changes to the architecture, and this will make the TM take on more
>> responsibilities.
>>
>> Thanks, vino.
>>
>>
>> 2018-07-30 17:34 GMT+08:00 Xinyu Zhang :
>>
>>> Thanks for your reply.  "periodic reading" means reading all logs in a
>>> given time interval. For example, my logs is daily divided, I can get all
>>> logs of yesterday through a parameter like '2018-07-29/2018-07-30'.
>>>
>>> TM which provides a web service to display information will lessen the
>>> burden of jobmanager, especially when there are many taskManagers in the
>>> flink cluster.
>>>
>>>
>>> 2018年7月30日星期一,vino yang  写道:
>>>
 Hi Xinyu,

 This is indeed a problem. Especially when the amount of logs is large,
 it may even cause the UI to stall for a long time. The same is true for
 YRAN, and there is really no good way to do it at the moment.
 Thank you for your suggestion, do you mean "periodic reading" refers to
 full or incremental? If it is a full reading, I personally do not recommend
 this, which will increase the burden on JM and client, and it is
 appropriate to manually trigger it by the user. Maybe we can consider
 loading only a few logs at a time, such as incremental reads, paged
 displays, and so on.
 Regarding the second question, Flink did this because its TM does not
 provide any web services to display information, and the Web UI is
 currently bundled with JM.

 Thanks, vino.

 2018-07-30 16:33 GMT+08:00 Xinyu Zhang :

> Hi all
>
> We use flink on yarn and flink version is 1.4.
>
> When a streaming job run for a long time, the webUI cannot show logs.
> This may be becasue the log size is too large.
>
> However, if we use the DailyRollingAppender to divide logs
> (granularity is `day`) in log4j.properties, we will never see the log of
> yesterday.
>
> Is there any ideas can make read logs easier?
>
> Maybe, we should add an interface that support for reading log by time
> interval. Besides, when we get the taskmanager logs through webUI,
> jobmanager can redirect to a URL of the taskmanager, which users can get
> the logs directly (Just like MR task), other than downloading the logs 
> from
> taskmanager and then sending logs to users.
>
> Thanks!
>
> Xinyu Zhang
>
>
>

>>


Re: Questions on Unbounded number of keys

2018-07-30 Thread Fabian Hueske
Hi Chang,

The state handle objects are not created per key but just once per function
instance.
Instead they route state accesses to the backend (JVM heap or RocksDB) for
the currently active key.

Best, Fabian

2018-07-30 12:19 GMT+02:00 Chang Liu :

> Hi Andrey,
>
> Thanks for your reply. My question might be silly, but there is still one
> part I would like to fully understand. For example, in the following
> example:
>
> class MyFunction extends KeyedProcessFunction[String, Click, Click] { // 
> keyed by Session ID
>   lazy val userId: ValueState[String] = getRuntimeContext.getState(
> new ValueStateDescriptor[String]("userId", 
> BasicTypeInfo.STRING_TYPE_INFO))
>
>   lazy val clicks: ListState[Click] = getRuntimeContext.getListState(
> new ListStateDescriptor[Click]("clicks", createTypeInformation[Click]))
>
>   override def processElement(
>   click: Click,
>   context: KeyedProcessFunction[String, Click, Click]#Context,
>   out: Collector[Click])
>   : Unit = {
> // process, output, clear state if necessary
>   }
>
>   override def onTimer(
>   timestamp: Long,
>   ctx: KeyedProcessFunction[String, Click, Click]#OnTimerContext,
>   out: Collector[Click])
>   : Unit = {
> // output and clear state
>   }
> }
>
>
> Even though I am regularly clearing the two states, userId and clicks
> (which means I am cleaning up the values stored in the States), my question
> is: then what about the two State objects themselves: userId and clicks?
> These States objects are also created per Session ID right? If the number
> of Session IDs are unbounded, than the number of these State objects are
> also unbounded.
>
> That means, there are *userId-state-1 and clicks-state-1 for session-id-1*,
> *userId-state-2 and clicks-state-2 for session-id-2*, *userId-state-3 and
> clicks-state-3 for session-id-3*, …, which are handled by different (or
> same if two from different *range*, as you call it, are assigned to the
> same one) keyed operator instance.
>
> I am not concerning the actual value in the State (which will be managed
> carefully, if I am clearing them carefully). I am thinking about the State
> objects themselves, which I have no idea what is happening to them and what
> will happen to them.
>
> Many thanks :)
>
> Best regards/祝好,
>
> Chang Liu 刘畅
>
>
> On 26 Jul 2018, at 10:55, Andrey Zagrebin 
> wrote:
>
> Hi Chang Liu,
>
> The unbounded nature of the stream keyed or not should not lead to out of
> memory.
>
> Flink parallel keyed operator instances have fixed number (parallelism)
> and just process some range of keyed elements, in your example it is a
> subrange of session ids.
>
> The keyed processed elements (http requests) are objects created when they
> enter the pipeline and garage collected after having been processed in
> streaming fashion.
>
> If they arrive very rapidly it can lead to high back pressure from
> upstream to downstream operators, buffers can become full and pipeline
> stops/slows down processing external inputs, it usually means that your
> pipeline is under provisioned.
>
> The only accumulated data comes from state (windows, user state etc), so
> if you control its memory consumption, as Till described, there should be
> no other source of out of memory.
>
> Cheers,
> Andrey
>
> On 25 Jul 2018, at 19:06, Chang Liu  wrote:
>
> Hi Till,
>
> Thanks for your reply. But I think maybe I did not make my question clear.
> My question is not about whether the States within each keyed operator
> instances will run out of memory. My question is about, whether the
> unlimited keyed operator instances themselves will run out of memory.
>
> So to reply to your answers, no matter using different State backends or
> regularly cleaning up the States (which is exactly what I am doing), it
> does not concern the number of keyed operator instances.
>
> I would like to know:
>
>- Will the number of keyed operator instances (Java objects?) grow
>unbounded?
>- If so, will they run out of memory? This is not actually related to
>the memory used by the keyed Stated inside.
>- If not, then how Flink is managing this multiple keyed operator
>instances?
>
>
> I think this needs more knowledge about how Flink works internally to
> understand how keyed operator instances are created, maintained and
> destroyed. That’s why I would like your help understanding this.
>
> Many Thanks.
>
> Best regards/祝好,
>
> Chang Liu 刘畅
>
>
> On 24 Jul 2018, at 14:31, Till Rohrmann  wrote:
>
> Hi Chang Liu,
>
> if you are dealing with an unlimited number of keys and keep state around
> for every key, then your state size will keep growing with the number of
> keys. If you are using the FileStateBackend which keeps state in memory,
> you will eventually run into an OutOfMemoryException. One way to
> solve/mitigate this problem is to use the RocksDBStateBackend which can go
> out of core.
>
> Alternatively, you would need to clean up your state before you run out of
>

Re: Small-files source - partitioning based on prefix of file

2018-07-30 Thread vino yang
Hi Averell,

Did you know Flink allow you to customize a partitioner?

Some resource :

official documentation :
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/#physical-partitioning
discussing in mailing list :
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/keyBy-using-custom-partitioner-td5379.html
a code example : https://gist.github.com/chiwanpark/e71d27cc8edae8bc7298

Thanks, vino.


2018-07-30 18:57 GMT+08:00 Averell :

> Hi everyone,
>
> We are collecting log files from tens of thousands of network nodes, and we
> need to do some data insights using that. The files are coming with the
> corresponding node ID in the file name, and I want to do custom
> partitioning
> using that Node ID.
> Right now (with Flink 1.5) I think that is not supported. I have been
> trying
> to look into the code, but it would take some time for me to understand.
> From the GUI, it looks like the first step of file source (directory
> monitoring) is rebalancing the stream to the 2nd step (file reader). And as
> per Flink document, rebalancing means round-robin. However, I could not
> find
> the call of "rebalancing" method, but "transform" is called. Not much
> information about that "transform" method though.
>
> Would it possible for me to ask for some guideline on this?
>
> Thanks for your help.
> Averell
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Small-files source - partitioning based on prefix of file

2018-07-30 Thread Averell
Hi everyone,

We are collecting log files from tens of thousands of network nodes, and we
need to do some data insights using that. The files are coming with the
corresponding node ID in the file name, and I want to do custom partitioning
using that Node ID.
Right now (with Flink 1.5) I think that is not supported. I have been trying
to look into the code, but it would take some time for me to understand.
>From the GUI, it looks like the first step of file source (directory
monitoring) is rebalancing the stream to the 2nd step (file reader). And as
per Flink document, rebalancing means round-robin. However, I could not find
the call of "rebalancing" method, but "transform" is called. Not much
information about that "transform" method though.

Would it possible for me to ask for some guideline on this?

Thanks for your help.
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: S3 file source - continuous monitoring - many files missed

2018-07-30 Thread Averell
Here is my 
https://github.com/lvhuyen/flink implementation
   of the change. 3 files were updated:
StreamExecutionEnvironment.java, StreamExecutionEnvironment.scala, and
ContinuousFileMonitoringFunction.java.

All the thanks to Fabian.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Running a Python streaming job with Java dependencies

2018-07-30 Thread Chesnay Schepler
To use java classes not bundled with FLink you will have to place a jar 
containing said classes into the /lib directory of the distribution.


On 25.07.2018 23:32, Joe Malt wrote:

Hi,

I'm trying to run a job with Flink's new Python streaming API but I'm 
running into issues with Java imports.


I have a Jython project in IntelliJ with a lot of Java dependencies 
configured through Maven. I can't figure out how to make Flink "see" 
these dependencies.


An example script that exhibits the problem is the following (it's the 
streaming example from the docs 
(https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/python.html#streaming-program-example) 
but with an extra import added)


from org.apache.flink.streaming.api.functions.sourceimport SourceFunction
from org.apache.flink.api.common.functionsimport FlatMapFunction, ReduceFunction
from org.apache.flink.api.java.functionsimport KeySelector
from org.apache.flink.streaming.api.windowing.time.Timeimport milliseconds

# Added an extra import, this fails with an ImportError import 
com.google.gson.GsonBuilder


class Generator(SourceFunction):
 def __init__(self, num_iters):
 self._running =True self._num_iters = num_iters# ... rest of the file 
is as in the documentation

This runs without any exceptions when run from IntelliJ (assuming 
com.google.gson is added in the POM), but when I try to run it as a 
Flink job with this command:


./pyflink-stream.sh ~/flink-python/MinimalExample.py - --local

it fails to find the dependency:

Starting execution of program
Failed to run plan: null
Traceback (most recent call last):
  File "", line 1, in 
  File 
"/var/folders/t1/gcltcjcn5zdgqfqrc32xk90x85xkg9/T/flink_streaming_plan_0bfab09c-baeb-414f-a718-01a5c71b3507/MinimalExample.py", 
line 7, in 

ImportError: No module named google

How can I point pyflink-stream.sh to these Maven dependencies? I've 
tried modifying the script to add my .m2/ directory to the classpath 
(using flink run -C), but that didn't make any difference:


bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

. "$bin"/config.sh

"$FLINK_BIN_DIR"/flink run -C "file:///Users/jmalt/.m2/" --class 
org.apache.flink.streaming.python.api.PythonStreamBinder -v 
"$FLINK_ROOT_DIR"/opt/flink-streaming-python*.jar "$@"



Thanks,

Joe Malt

Engineering Intern, Stream Processing
Yelp





Flink job is not reading from certain kafka topic partitions

2018-07-30 Thread vivekyadav01
We have a Flink stream job that uses FlinkKafkaConsumer010. We keep the
kafkaconsumer operator's parallelism same as the number of partition in
kafka topic so that each partition is attached to one subtask.
Quite frequently the job stops reading from certain partitions. On
investigating under job metrics tab(committed offsets & current offset
metrics), few partitions are not associated with any subtask while few
partitions are associated with more than 1 subtasks(which seems wrong
behavior). I have debugged it locally and seen that initially the consumer
starts fine with uniform association with partitions but over the time some
partitions disassociate from the subtask.

Here is our kafka consumer config
-checkpoint disabled.
-enable.auto.commit true
-auto.commit.interval.ms 5 minutes
-request.timeout.ms 3 minutes

Kafka Version: 0.10.1.0
Flink version: 1.3.1

Has someone else faced this issue? Any help or pointers would be much
appreciated.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink job is not stops reading or stops reading from certain kafka topic partitions

2018-07-30 Thread vivekyadav01
We have a Flink stream job that uses FlinkKafkaConsumer010. We keep the
kafkaconsumer operator's parallelism same as the number of partition in
kafka topic so that each partition is attached to one subtask.
Quite frequently the job stops reading from certain partitions. On
investigating under job metrics tab(committed offsets & current offset
metrics), few partitions are not associated with any subtask while few
partitions are associated with more than 1 subtasks(which seems wrong
behavior). I have debugged it locally and seen that initially the consumer
starts fine with uniform association with partitions but over the time some
partitions disassociate from the subtask.

Here is our kafka consumer config
-checkpoint disabled.
-enable.auto.commit true
-auto.commit.interval.ms 5 minutes
-request.timeout.ms 3 minutes

Kafka Version: 0.10.1.0
Flink version: 1.3.1

Has someone else faced this issue? Any help or pointers would be much
appreciated.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Questions on Unbounded number of keys

2018-07-30 Thread Chang Liu
Hi Andrey,

Thanks for your reply. My question might be silly, but there is still one part 
I would like to fully understand. For example, in the following example:

class MyFunction extends KeyedProcessFunction[String, Click, Click] { // keyed 
by Session ID
  lazy val userId: ValueState[String] = getRuntimeContext.getState(
new ValueStateDescriptor[String]("userId", BasicTypeInfo.STRING_TYPE_INFO))

  lazy val clicks: ListState[Click] = getRuntimeContext.getListState(
new ListStateDescriptor[Click]("clicks", createTypeInformation[Click]))

  override def processElement(
  click: Click,
  context: KeyedProcessFunction[String, Click, Click]#Context,
  out: Collector[Click])
  : Unit = {
// process, output, clear state if necessary
  }

  override def onTimer(
  timestamp: Long,
  ctx: KeyedProcessFunction[String, Click, Click]#OnTimerContext,
  out: Collector[Click])
  : Unit = {
// output and clear state
  }
}

Even though I am regularly clearing the two states, userId and clicks (which 
means I am cleaning up the values stored in the States), my question is: then 
what about the two State objects themselves: userId and clicks?  These States 
objects are also created per Session ID right? If the number of Session IDs are 
unbounded, than the number of these State objects are also unbounded.

That means, there are userId-state-1 and clicks-state-1 for session-id-1, 
userId-state-2 and clicks-state-2 for session-id-2, userId-state-3 and 
clicks-state-3 for session-id-3, …, which are handled by different (or same if 
two from different range, as you call it, are assigned to the same one) keyed 
operator instance.

I am not concerning the actual value in the State (which will be managed 
carefully, if I am clearing them carefully). I am thinking about the State 
objects themselves, which I have no idea what is happening to them and what 
will happen to them.

Many thanks :)

Best regards/祝好,

Chang Liu 刘畅


> On 26 Jul 2018, at 10:55, Andrey Zagrebin  wrote:
> 
> Hi Chang Liu,
> 
> The unbounded nature of the stream keyed or not should not lead to out of 
> memory. 
> 
> Flink parallel keyed operator instances have fixed number (parallelism) and 
> just process some range of keyed elements, in your example it is a subrange 
> of session ids. 
> 
> The keyed processed elements (http requests) are objects created when they 
> enter the pipeline and garage collected after having been processed in 
> streaming fashion. 
> 
> If they arrive very rapidly it can lead to high back pressure from upstream 
> to downstream operators, buffers can become full and pipeline stops/slows 
> down processing external inputs, it usually means that your pipeline is under 
> provisioned. 
> 
> The only accumulated data comes from state (windows, user state etc), so if 
> you control its memory consumption, as Till described, there should be no 
> other source of out of memory.
> 
> Cheers,
> Andrey
> 
>> On 25 Jul 2018, at 19:06, Chang Liu > > wrote:
>> 
>> Hi Till,
>> 
>> Thanks for your reply. But I think maybe I did not make my question clear. 
>> My question is not about whether the States within each keyed operator 
>> instances will run out of memory. My question is about, whether the 
>> unlimited keyed operator instances themselves will run out of memory.
>> 
>> So to reply to your answers, no matter using different State backends or 
>> regularly cleaning up the States (which is exactly what I am doing), it does 
>> not concern the number of keyed operator instances.
>> 
>> I would like to know:
>> Will the number of keyed operator instances (Java objects?) grow unbounded? 
>> If so, will they run out of memory? This is not actually related to the 
>> memory used by the keyed Stated inside.
>> If not, then how Flink is managing this multiple keyed operator instances?
>> 
>> I think this needs more knowledge about how Flink works internally to 
>> understand how keyed operator instances are created, maintained and 
>> destroyed. That’s why I would like your help understanding this.
>> 
>> Many Thanks.
>> 
>> Best regards/祝好,
>> 
>> Chang Liu 刘畅
>> 
>> 
>>> On 24 Jul 2018, at 14:31, Till Rohrmann >> > wrote:
>>> 
>>> Hi Chang Liu,
>>> 
>>> if you are dealing with an unlimited number of keys and keep state around 
>>> for every key, then your state size will keep growing with the number of 
>>> keys. If you are using the FileStateBackend which keeps state in memory, 
>>> you will eventually run into an OutOfMemoryException. One way to 
>>> solve/mitigate this problem is to use the RocksDBStateBackend which can go 
>>> out of core.
>>> 
>>> Alternatively, you would need to clean up your state before you run out of 
>>> memory. One way to do this is to register for every key a timer which 
>>> clears the state. But this only works if you don't amass too much state 
>>> data before the timer is triggered. If you wish this solution 

Re: Logs are not easy to read through webUI

2018-07-30 Thread Xinyu Zhang
Hi vino

Yes, it's only from the perspective of performance of reading log or
metrics.  If the logs with timestamps(e.g. jobmanager.log.2018-07-29) will
never change, maybe blob store can cache some of them to improve
performance.

BTW, please considering to develop an API for reading logs. I think many
flink users meet this problem.

Thanks!

Xinyu Zhang

2018年7月30日星期一,vino yang  写道:

> Hi Xinyu,
>
> Thank you for your clarification on "periodic reading". If Flink considers
> developing an API for reading logs, I think this is a good idea.
>
> Regarding the problem of TM reading logs, your idea is good from a
> performance perspective.
> But Flink didn't provide any web services for the TM from the beginning.
> All the requests were passed through the JM proxy.
> Just because of the log read performance changes, there will be major
> changes to the architecture, and this will make the TM take on more
> responsibilities.
>
> Thanks, vino.
>
>
> 2018-07-30 17:34 GMT+08:00 Xinyu Zhang :
>
>> Thanks for your reply.  "periodic reading" means reading all logs in a
>> given time interval. For example, my logs is daily divided, I can get all
>> logs of yesterday through a parameter like '2018-07-29/2018-07-30'.
>>
>> TM which provides a web service to display information will lessen the
>> burden of jobmanager, especially when there are many taskManagers in the
>> flink cluster.
>>
>>
>> 2018年7月30日星期一,vino yang  写道:
>>
>>> Hi Xinyu,
>>>
>>> This is indeed a problem. Especially when the amount of logs is large,
>>> it may even cause the UI to stall for a long time. The same is true for
>>> YRAN, and there is really no good way to do it at the moment.
>>> Thank you for your suggestion, do you mean "periodic reading" refers to
>>> full or incremental? If it is a full reading, I personally do not recommend
>>> this, which will increase the burden on JM and client, and it is
>>> appropriate to manually trigger it by the user. Maybe we can consider
>>> loading only a few logs at a time, such as incremental reads, paged
>>> displays, and so on.
>>> Regarding the second question, Flink did this because its TM does not
>>> provide any web services to display information, and the Web UI is
>>> currently bundled with JM.
>>>
>>> Thanks, vino.
>>>
>>> 2018-07-30 16:33 GMT+08:00 Xinyu Zhang :
>>>
 Hi all

 We use flink on yarn and flink version is 1.4.

 When a streaming job run for a long time, the webUI cannot show logs.
 This may be becasue the log size is too large.

 However, if we use the DailyRollingAppender to divide logs (granularity
 is `day`) in log4j.properties, we will never see the log of yesterday.

 Is there any ideas can make read logs easier?

 Maybe, we should add an interface that support for reading log by time
 interval. Besides, when we get the taskmanager logs through webUI,
 jobmanager can redirect to a URL of the taskmanager, which users can get
 the logs directly (Just like MR task), other than downloading the logs from
 taskmanager and then sending logs to users.

 Thanks!

 Xinyu Zhang



>>>
>


Re: Logs are not easy to read through webUI

2018-07-30 Thread vino yang
Hi Xinyu,

Thank you for your clarification on "periodic reading". If Flink considers
developing an API for reading logs, I think this is a good idea.

Regarding the problem of TM reading logs, your idea is good from a
performance perspective.
But Flink didn't provide any web services for the TM from the beginning.
All the requests were passed through the JM proxy.
Just because of the log read performance changes, there will be major
changes to the architecture, and this will make the TM take on more
responsibilities.

Thanks, vino.


2018-07-30 17:34 GMT+08:00 Xinyu Zhang :

> Thanks for your reply.  "periodic reading" means reading all logs in a
> given time interval. For example, my logs is daily divided, I can get all
> logs of yesterday through a parameter like '2018-07-29/2018-07-30'.
>
> TM which provides a web service to display information will lessen the
> burden of jobmanager, especially when there are many taskManagers in the
> flink cluster.
>
>
> 2018年7月30日星期一,vino yang  写道:
>
>> Hi Xinyu,
>>
>> This is indeed a problem. Especially when the amount of logs is large, it
>> may even cause the UI to stall for a long time. The same is true for YRAN,
>> and there is really no good way to do it at the moment.
>> Thank you for your suggestion, do you mean "periodic reading" refers to
>> full or incremental? If it is a full reading, I personally do not recommend
>> this, which will increase the burden on JM and client, and it is
>> appropriate to manually trigger it by the user. Maybe we can consider
>> loading only a few logs at a time, such as incremental reads, paged
>> displays, and so on.
>> Regarding the second question, Flink did this because its TM does not
>> provide any web services to display information, and the Web UI is
>> currently bundled with JM.
>>
>> Thanks, vino.
>>
>> 2018-07-30 16:33 GMT+08:00 Xinyu Zhang :
>>
>>> Hi all
>>>
>>> We use flink on yarn and flink version is 1.4.
>>>
>>> When a streaming job run for a long time, the webUI cannot show logs.
>>> This may be becasue the log size is too large.
>>>
>>> However, if we use the DailyRollingAppender to divide logs (granularity
>>> is `day`) in log4j.properties, we will never see the log of yesterday.
>>>
>>> Is there any ideas can make read logs easier?
>>>
>>> Maybe, we should add an interface that support for reading log by time
>>> interval. Besides, when we get the taskmanager logs through webUI,
>>> jobmanager can redirect to a URL of the taskmanager, which users can get
>>> the logs directly (Just like MR task), other than downloading the logs from
>>> taskmanager and then sending logs to users.
>>>
>>> Thanks!
>>>
>>> Xinyu Zhang
>>>
>>>
>>>
>>


Re: watermark VS window trigger

2018-07-30 Thread vino yang
Hi Soheil,

I feel that some of your understanding is a bit problematic.

*"After that according to the current watermark, data with the timestamp
between the last watermark and current watermark will be released and go to
the next steps"*

The main role of Watermark here is to define the progress of the event
time, which will serve as the time base for the window to trigger. Before
the time window, the upstream will only generate a Watermark according to a
specific cycle, and then raise the Watermark of the downstream task while
flowing downstream.

You can read "event time & watermark" documentation [1].

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html

Thanks, vino.


2018-07-30 17:05 GMT+08:00 Soheil Pourbafrani :

> Suppose we have a time window of 10 milliseconds and we use EventTime.
> First, we determine how Flink can get time and watermark from
> incoming messages, after that, we set a key for the stream and set a time
> window.
>
> aggregatedTuple
> .assignTimestampsAndWatermarks(new SampleTimestampExtractor())
> 
> .keyBy(1).timeWindow(Time.milliseconds(1000))/*.countWindow(3)*/
> .reduce()
>
> My understanding of the data flow in this scenario is the following:
>
> Flink advanced time according to the timestamp of Incoming data into the
> aggregatedTuple variable while for each message get the timestamp and
> watermark.
> As I use Periodic Watermarks, according to default watermark interval
> (200ms), watermarks will be updated. After that according to the
> current watermark, data with the timestamp between the last watermark and
> current watermark will be released and go to the next steps (keyBy,
> timeWindow, reduce). If Flink received a data but an appropriate watermark
> didn't emit for that data yet, Flink didn't send that data to the next
> steps and keep it until it's appropriate watermark will be emitted.
>
> Is that correct?
>


Logs are not easy to read through webUI

2018-07-30 Thread Xinyu Zhang
Thanks for your reply.  "periodic reading" means reading all logs in a
given time interval. For example, my logs is daily divided, I can get all
logs of yesterday through a parameter like '2018-07-29/2018-07-30'.

TM which provides a web service to display information will lessen the
burden of jobmanager, especially when there are many taskManagers in the
flink cluster.


2018年7月30日星期一,vino yang  写道:

> Hi Xinyu,
>
> This is indeed a problem. Especially when the amount of logs is large, it
> may even cause the UI to stall for a long time. The same is true for YRAN,
> and there is really no good way to do it at the moment.
> Thank you for your suggestion, do you mean "periodic reading" refers to
> full or incremental? If it is a full reading, I personally do not recommend
> this, which will increase the burden on JM and client, and it is
> appropriate to manually trigger it by the user. Maybe we can consider
> loading only a few logs at a time, such as incremental reads, paged
> displays, and so on.
> Regarding the second question, Flink did this because its TM does not
> provide any web services to display information, and the Web UI is
> currently bundled with JM.
>
> Thanks, vino.
>
> 2018-07-30 16:33 GMT+08:00 Xinyu Zhang :
>
>> Hi all
>>
>> We use flink on yarn and flink version is 1.4.
>>
>> When a streaming job run for a long time, the webUI cannot show logs.
>> This may be becasue the log size is too large.
>>
>> However, if we use the DailyRollingAppender to divide logs (granularity
>> is `day`) in log4j.properties, we will never see the log of yesterday.
>>
>> Is there any ideas can make read logs easier?
>>
>> Maybe, we should add an interface that support for reading log by time
>> interval. Besides, when we get the taskmanager logs through webUI,
>> jobmanager can redirect to a URL of the taskmanager, which users can get
>> the logs directly (Just like MR task), other than downloading the logs from
>> taskmanager and then sending logs to users.
>>
>> Thanks!
>>
>> Xinyu Zhang
>>
>>
>>
>


Re: RuntimeException with valve output watermark when using CoGroup

2018-07-30 Thread Taneli Saastamoinen
On 27 July 2018 at 19:21, Chesnay Schepler  wrote:
> At first glance this looks like a bug. Is the nothing in the stack trace
after the NullPointerException?

Hmm, there is actually, sorry about that:

Caused by: java.lang.NullPointerException
at
org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignToKeyGroup(KeyGroupRangeAssignment.java:59)
at
org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator(KeyGroupRangeAssignment.java:48)
at
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:63)
at
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
at
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
... 22 more

> How reliably can you reproduce this?

100%, though I've only run this job a handful of times. What's frustrating
is that I cannot reproduce this easily in a unit test (all my unit tests
work fine). On production data it happens every time and pretty much
instantly, but our data volumes are big enough that it's difficult to try
to dig into it further.

For now I think I'll split the job into two, have the first aggregation
write to Kafka and have the second aggregation as a separate job that reads
its input from Kafka. When I run the first aggregation only that is fine
and no errors occur, so the issue seems to be the combination of
aggregations.

Cheers,


Re: Working out through individual messages in Flink

2018-07-30 Thread Mich Talebzadeh
Thanks again.

The Hbase connector works fine in Flink

// Start Hbase table stuff
val tableName = "MARKETDATAHBASESPEEDFLINK"
val hbaseConf = HBaseConfiguration.create()
//  Connecting to remote Hbase
hbaseConf.set("hbase.master", hbaseHost)
hbaseConf.set("hbase.zookeeper.quorum",zookeeperHost)
hbaseConf.set("hbase.zookeeper.property.clientPort",zooKeeperClientPort)
hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName)
// create this table with column family
val admin = new HBaseAdmin(hbaseConf)
if(!admin.isTableAvailable(tableName))
{
  println("Creating table " + tableName)
  val tableDesc = new HTableDescriptor(tableName)
  tableDesc.addFamily(new HColumnDescriptor("PRICE_INFO".getBytes()))
  tableDesc.addFamily(new HColumnDescriptor("OPERATION".getBytes()))
  admin.createTable(tableDesc)
} else {
  println("Table " + tableName + " already exists!!")
}
val HbaseTable = new HTable(hbaseConf, tableName)
// End Hbase table stuff

So I just need to split every row into columns and put it into Hbase as
follows:

// Save prices to Hbase table
 var p = new Put(new String(key).getBytes())
 //p.add("PRICE_INFO".getBytes(), "key".getBytes(),
new String(ticker).getBytes())
 p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(),
new String(ticker).getBytes())
 p.add("PRICE_INFO".getBytes(), "SSUED".getBytes(), new
String(timeissued).getBytes())
 p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(),
new String(priceToString).getBytes())
 p.add("PRICE_INFO".getBytes(), "CURRENCY".getBytes(),
new String(CURRENCY).getBytes())
 p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(),
new String(1.toString).getBytes())
 p.add("OPERATION".getBytes(), "OP_TIME".getBytes(),
new String(System.currentTimeMillis.toString).getBytes())
 HbaseTable.put(p)
 HbaseTable.flushCommits()


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 30 Jul 2018 at 09:58, Fabian Hueske  wrote:

> A *Table*Source [1], is a special input connector for Flink's relational
> APIs (Table API and SQL) [2].
> You can transform and filter with these APIs as well (it's probably even
> easier). In SQL this would be the SELECT and WHERE clauses of a query.
>
> However, there is no *Table*Sink for HBase and you would need to convert
> the Table back to a DataStream [3].
> That's not very difficult since the APIs are integrated with each other.
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset
>
> 2018-07-30 10:47 GMT+02:00 Mich Talebzadeh :
>
>> Thanks Fabian. That was very useful.
>>
>> How about an operation like below?
>>
>>  // create builder
>>  val KafkaTableSource = Kafka011JsonTableSource.builder()
>>  // set Kafka topic
>> .forTopic(topicsValue)
>>  // set Kafka consumer properties
>> .withKafkaProperties(properties)
>>  // set Table schema
>> .withSchema(TableSchema.builder()
>> .field("key", Types.STRING)
>> .field("ticker", Types.STRING)
>> .field("timeissued", Types.STRING)
>> .field("price", Types.FLOAT)
>> .build())
>>
>> Will that be OK?
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 30 Jul 2018 at 09:19, Fabian Hueske  wrote:
>>
>>> Hi,
>>>
>>> Flink processes streams record by record, instead of micro-batching
>>> records together. Since every record comes by itself, there is no for-each.
>>> Simple record-by-record transformations ca

Re: Logs are not easy to read through webUI

2018-07-30 Thread vino yang
Hi Xinyu,

This is indeed a problem. Especially when the amount of logs is large, it
may even cause the UI to stall for a long time. The same is true for YRAN,
and there is really no good way to do it at the moment.
Thank you for your suggestion, do you mean "periodic reading" refers to
full or incremental? If it is a full reading, I personally do not recommend
this, which will increase the burden on JM and client, and it is
appropriate to manually trigger it by the user. Maybe we can consider
loading only a few logs at a time, such as incremental reads, paged
displays, and so on.
Regarding the second question, Flink did this because its TM does not
provide any web services to display information, and the Web UI is
currently bundled with JM.

Thanks, vino.

2018-07-30 16:33 GMT+08:00 Xinyu Zhang :

> Hi all
>
> We use flink on yarn and flink version is 1.4.
>
> When a streaming job run for a long time, the webUI cannot show logs. This
> may be becasue the log size is too large.
>
> However, if we use the DailyRollingAppender to divide logs (granularity is
> `day`) in log4j.properties, we will never see the log of yesterday.
>
> Is there any ideas can make read logs easier?
>
> Maybe, we should add an interface that support for reading log by time
> interval. Besides, when we get the taskmanager logs through webUI,
> jobmanager can redirect to a URL of the taskmanager, which users can get
> the logs directly (Just like MR task), other than downloading the logs from
> taskmanager and then sending logs to users.
>
> Thanks!
>
> Xinyu Zhang
>
>
>


watermark VS window trigger

2018-07-30 Thread Soheil Pourbafrani
Suppose we have a time window of 10 milliseconds and we use EventTime.
First, we determine how Flink can get time and watermark from
incoming messages, after that, we set a key for the stream and set a time
window.

aggregatedTuple
.assignTimestampsAndWatermarks(new SampleTimestampExtractor())
.keyBy(1).timeWindow(Time.milliseconds(1000))/*.countWindow(3)*/
.reduce()

My understanding of the data flow in this scenario is the following:

Flink advanced time according to the timestamp of Incoming data into the
aggregatedTuple variable while for each message get the timestamp and
watermark.
As I use Periodic Watermarks, according to default watermark interval
(200ms), watermarks will be updated. After that according to the
current watermark, data with the timestamp between the last watermark and
current watermark will be released and go to the next steps (keyBy,
timeWindow, reduce). If Flink received a data but an appropriate watermark
didn't emit for that data yet, Flink didn't send that data to the next
steps and keep it until it's appropriate watermark will be emitted.

Is that correct?


Re: Working out through individual messages in Flink

2018-07-30 Thread Fabian Hueske
A *Table*Source [1], is a special input connector for Flink's relational
APIs (Table API and SQL) [2].
You can transform and filter with these APIs as well (it's probably even
easier). In SQL this would be the SELECT and WHERE clauses of a query.

However, there is no *Table*Sink for HBase and you would need to convert
the Table back to a DataStream [3].
That's not very difficult since the APIs are integrated with each other.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset

2018-07-30 10:47 GMT+02:00 Mich Talebzadeh :

> Thanks Fabian. That was very useful.
>
> How about an operation like below?
>
>  // create builder
>  val KafkaTableSource = Kafka011JsonTableSource.builder()
>  // set Kafka topic
> .forTopic(topicsValue)
>  // set Kafka consumer properties
> .withKafkaProperties(properties)
>  // set Table schema
> .withSchema(TableSchema.builder()
> .field("key", Types.STRING)
> .field("ticker", Types.STRING)
> .field("timeissued", Types.STRING)
> .field("price", Types.FLOAT)
> .build())
>
> Will that be OK?
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 30 Jul 2018 at 09:19, Fabian Hueske  wrote:
>
>> Hi,
>>
>> Flink processes streams record by record, instead of micro-batching
>> records together. Since every record comes by itself, there is no for-each.
>> Simple record-by-record transformations can be done with a MapFunction,
>> filtering out records with a FilterFunction. You can also implement a
>> FlatMapFunction to do both in one step.
>>
>> Once the stream is transformed and filtered, you can write it to HBase
>> with a sink function.
>>
>>
>> 2018-07-30 10:03 GMT+02:00 Mich Talebzadeh :
>>
>>> Just to clarify these are the individual prices separated by ','. The
>>> below shows three price lines in the topic
>>>
>>> UUID,Security, Time,Price
>>> 1230a9a9-dc57-40e4-a000-6ea6989c754a, MRW, 2018-07-28T20:38:43, 241.88
>>> 8143c6ca-109f-484f-be0e-9d0f5ca32f07,SAP,2018-07-28T20:38:44,56.94
>>> 81a54ff8-6ac8-470a-a522-51737d685264,VOD,2018-07-28T20:38:44,219.33
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Mon, 30 Jul 2018 at 07:58, Mich Talebzadeh 
>>> wrote:
>>>

 Hi,

 I have a Kafka topic that transmits 100 security prices ever 2 seconds.

 In Spark streaming I go through the RDD and walk through rows one by
 one and check prices
 In prices are valuable I store them into an Hbase table

 val dstream = KafkaUtils.createDirectStream[String, String,
 StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
 dstream.cache()
 dstream.foreachRDD
 { pricesRDD =>
   // Work on individual messages
   *   for(line <- pricesRDD.collect.toArray)*
  {
var key = line._2.split(',').view(0).toString
var ticker =  line._2.split(',').view(1).toString
var timeissued = line._2.split(',').view(2).toString
var price = line._2.split(',').view(3).toFloat
val priceToString = line._2.split(',').view(3)
 if (price > 90.0)
{
//save to Hbase table
}
   }
  }

 That works fine.

 In Flink I define my source as below

 val streamExecEnv = StreamExecutionEnvironment.
 getExecutionEnvironment
 streamExecEnv.setStreamTi

Re: Working out through individual messages in Flink

2018-07-30 Thread Mich Talebzadeh
Thanks Fabian. That was very useful.

How about an operation like below?

 // create builder
 val KafkaTableSource = Kafka011JsonTableSource.builder()
 // set Kafka topic
.forTopic(topicsValue)
 // set Kafka consumer properties
.withKafkaProperties(properties)
 // set Table schema
.withSchema(TableSchema.builder()
.field("key", Types.STRING)
.field("ticker", Types.STRING)
.field("timeissued", Types.STRING)
.field("price", Types.FLOAT)
.build())

Will that be OK?


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 30 Jul 2018 at 09:19, Fabian Hueske  wrote:

> Hi,
>
> Flink processes streams record by record, instead of micro-batching
> records together. Since every record comes by itself, there is no for-each.
> Simple record-by-record transformations can be done with a MapFunction,
> filtering out records with a FilterFunction. You can also implement a
> FlatMapFunction to do both in one step.
>
> Once the stream is transformed and filtered, you can write it to HBase
> with a sink function.
>
>
> 2018-07-30 10:03 GMT+02:00 Mich Talebzadeh :
>
>> Just to clarify these are the individual prices separated by ','. The
>> below shows three price lines in the topic
>>
>> UUID,Security, Time,Price
>> 1230a9a9-dc57-40e4-a000-6ea6989c754a, MRW, 2018-07-28T20:38:43, 241.88
>> 8143c6ca-109f-484f-be0e-9d0f5ca32f07,SAP,2018-07-28T20:38:44,56.94
>> 81a54ff8-6ac8-470a-a522-51737d685264,VOD,2018-07-28T20:38:44,219.33
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 30 Jul 2018 at 07:58, Mich Talebzadeh 
>> wrote:
>>
>>>
>>> Hi,
>>>
>>> I have a Kafka topic that transmits 100 security prices ever 2 seconds.
>>>
>>> In Spark streaming I go through the RDD and walk through rows one by one
>>> and check prices
>>> In prices are valuable I store them into an Hbase table
>>>
>>> val dstream = KafkaUtils.createDirectStream[String, String,
>>> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
>>> dstream.cache()
>>> dstream.foreachRDD
>>> { pricesRDD =>
>>>   // Work on individual messages
>>>   *   for(line <- pricesRDD.collect.toArray)*
>>>  {
>>>var key = line._2.split(',').view(0).toString
>>>var ticker =  line._2.split(',').view(1).toString
>>>var timeissued = line._2.split(',').view(2).toString
>>>var price = line._2.split(',').view(3).toFloat
>>>val priceToString = line._2.split(',').view(3)
>>> if (price > 90.0)
>>>{
>>>//save to Hbase table
>>>}
>>>   }
>>>  }
>>>
>>> That works fine.
>>>
>>> In Flink I define my source as below
>>>
>>> val streamExecEnv =
>>> StreamExecutionEnvironment.getExecutionEnvironment
>>>
>>> streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>> val stream = streamExecEnv
>>>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new
>>> SimpleStringSchema(), properties))
>>>
>>> Is there anyway I can perform similar operation in Flink? I need to go
>>> through every topic load sent and look at the individual rows/ For example
>>> what is the equivalent of
>>>
>>> *for(line <- pricesRDD.collect.toArray)*
>>> In flink?
>>>
>>> Thanks
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is

Logs are not easy to read through webUI

2018-07-30 Thread Xinyu Zhang
Hi all

We use flink on yarn and flink version is 1.4.

When a streaming job run for a long time, the webUI cannot show logs. This
may be becasue the log size is too large.

However, if we use the DailyRollingAppender to divide logs (granularity is
`day`) in log4j.properties, we will never see the log of yesterday.

Is there any ideas can make read logs easier?

Maybe, we should add an interface that support for reading log by time
interval. Besides, when we get the taskmanager logs through webUI,
jobmanager can redirect to a URL of the taskmanager, which users can get
the logs directly (Just like MR task), other than downloading the logs from
taskmanager and then sending logs to users.

Thanks!

Xinyu Zhang


Re: Working out through individual messages in Flink

2018-07-30 Thread Fabian Hueske
Hi,

Flink processes streams record by record, instead of micro-batching records
together. Since every record comes by itself, there is no for-each.
Simple record-by-record transformations can be done with a MapFunction,
filtering out records with a FilterFunction. You can also implement a
FlatMapFunction to do both in one step.

Once the stream is transformed and filtered, you can write it to HBase with
a sink function.


2018-07-30 10:03 GMT+02:00 Mich Talebzadeh :

> Just to clarify these are the individual prices separated by ','. The
> below shows three price lines in the topic
>
> UUID,Security, Time,Price
> 1230a9a9-dc57-40e4-a000-6ea6989c754a, MRW, 2018-07-28T20:38:43, 241.88
> 8143c6ca-109f-484f-be0e-9d0f5ca32f07,SAP,2018-07-28T20:38:44,56.94
> 81a54ff8-6ac8-470a-a522-51737d685264,VOD,2018-07-28T20:38:44,219.33
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 30 Jul 2018 at 07:58, Mich Talebzadeh 
> wrote:
>
>>
>> Hi,
>>
>> I have a Kafka topic that transmits 100 security prices ever 2 seconds.
>>
>> In Spark streaming I go through the RDD and walk through rows one by one
>> and check prices
>> In prices are valuable I store them into an Hbase table
>>
>> val dstream = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
>> dstream.cache()
>> dstream.foreachRDD
>> { pricesRDD =>
>>   // Work on individual messages
>>   *   for(line <- pricesRDD.collect.toArray)*
>>  {
>>var key = line._2.split(',').view(0).toString
>>var ticker =  line._2.split(',').view(1).toString
>>var timeissued = line._2.split(',').view(2).toString
>>var price = line._2.split(',').view(3).toFloat
>>val priceToString = line._2.split(',').view(3)
>> if (price > 90.0)
>>{
>>//save to Hbase table
>>}
>>   }
>>  }
>>
>> That works fine.
>>
>> In Flink I define my source as below
>>
>> val streamExecEnv = StreamExecutionEnvironment.
>> getExecutionEnvironment
>> streamExecEnv.setStreamTimeCharacteristic(
>> TimeCharacteristic.EventTime)
>> val stream = streamExecEnv
>>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new
>> SimpleStringSchema(), properties))
>>
>> Is there anyway I can perform similar operation in Flink? I need to go
>> through every topic load sent and look at the individual rows/ For example
>> what is the equivalent of
>>
>> *for(line <- pricesRDD.collect.toArray)*
>> In flink?
>>
>> Thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>


Re: Working out through individual messages in Flink

2018-07-30 Thread Mich Talebzadeh
Just to clarify these are the individual prices separated by ','. The below
shows three price lines in the topic

UUID,Security, Time,Price
1230a9a9-dc57-40e4-a000-6ea6989c754a, MRW, 2018-07-28T20:38:43, 241.88
8143c6ca-109f-484f-be0e-9d0f5ca32f07,SAP,2018-07-28T20:38:44,56.94
81a54ff8-6ac8-470a-a522-51737d685264,VOD,2018-07-28T20:38:44,219.33


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 30 Jul 2018 at 07:58, Mich Talebzadeh 
wrote:

>
> Hi,
>
> I have a Kafka topic that transmits 100 security prices ever 2 seconds.
>
> In Spark streaming I go through the RDD and walk through rows one by one
> and check prices
> In prices are valuable I store them into an Hbase table
>
> val dstream = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
> dstream.cache()
> dstream.foreachRDD
> { pricesRDD =>
>   // Work on individual messages
>   *   for(line <- pricesRDD.collect.toArray)*
>  {
>var key = line._2.split(',').view(0).toString
>var ticker =  line._2.split(',').view(1).toString
>var timeissued = line._2.split(',').view(2).toString
>var price = line._2.split(',').view(3).toFloat
>val priceToString = line._2.split(',').view(3)
> if (price > 90.0)
>{
>//save to Hbase table
>}
>   }
>  }
>
> That works fine.
>
> In Flink I define my source as below
>
> val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
> streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val stream = streamExecEnv
>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new
> SimpleStringSchema(), properties))
>
> Is there anyway I can perform similar operation in Flink? I need to go
> through every topic load sent and look at the individual rows/ For example
> what is the equivalent of
>
> *for(line <- pricesRDD.collect.toArray)*
> In flink?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: Working out through individual messages in Flink

2018-07-30 Thread Renjie Liu
Hi, Mich:
You can add write a sink function for that.

On Mon, Jul 30, 2018 at 2:58 PM Mich Talebzadeh 
wrote:

>
> Hi,
>
> I have a Kafka topic that transmits 100 security prices ever 2 seconds.
>
> In Spark streaming I go through the RDD and walk through rows one by one
> and check prices
> In prices are valuable I store them into an Hbase table
>
> val dstream = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
> dstream.cache()
> dstream.foreachRDD
> { pricesRDD =>
>   // Work on individual messages
>   *   for(line <- pricesRDD.collect.toArray)*
>  {
>var key = line._2.split(',').view(0).toString
>var ticker =  line._2.split(',').view(1).toString
>var timeissued = line._2.split(',').view(2).toString
>var price = line._2.split(',').view(3).toFloat
>val priceToString = line._2.split(',').view(3)
> if (price > 90.0)
>{
>//save to Hbase table
>}
>   }
>  }
>
> That works fine.
>
> In Flink I define my source as below
>
> val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
> streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val stream = streamExecEnv
>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new
> SimpleStringSchema(), properties))
>
> Is there anyway I can perform similar operation in Flink? I need to go
> through every topic load sent and look at the individual rows/ For example
> what is the equivalent of
>
> *for(line <- pricesRDD.collect.toArray)*
> In flink?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
-- 
Liu, Renjie
Software Engineer, MVAD


Re: Flink wrong Watermark in Periodic watermark

2018-07-30 Thread Xingcan Cui
HI Soheil,

That may relate to your parallelism since each extractor instance compute its 
own watermarks. Try to print the max timestamps with the current thread’s name 
and you will notice this.

Best,
Xingcan

> On Jul 30, 2018, at 3:05 PM, Soheil Pourbafrani  wrote:
> 
> Using Flink EventTime feature, I implement the class 
> AssignerWithPeriodicWatermark such that:
> 
> public static class SampleTimestampExtractor implements 
> AssignerWithPeriodicWatermarks> {
> private static final long serialVersionUID = 1L;
> private long MAX_TIMESTAMP;
> private final long DELEY = 3000;
> 
> 
> @Override
> public long extractTimestamp(Tuple3 t, long l) {
> long timestamp = t.f1 ;
> MAX_TIMESTAMP =  Math.max(timestamp , MAX_TIMESTAMP);
> System.out.println("Max TimeStamp : " + MAX_TIMESTAMP);
> return timestamp ;
> }
> 
> @Nullable
> @Override
> public Watermark getCurrentWatermark() {
> System.out.println("Current WatreMark : " + (MAX_TIMESTAMP - DELEY));
> return new Watermark(MAX_TIMESTAMP - DELEY);
> }
> }
> In addition, I set the watermark interval to 100 milliseconds:
> env.getConfig().setAutoWatermarkInterval(100);
> But when I check the logs, some watermarks are -3000, so in 
> getCurrentWatermark method, it considers the MAX_TIMESTAMP zero (0 - 3000 = 
> -3000), while I can see in the logs that the MAX_TIMESTAMP has a value 
> greater than zero!
> Here is a part of the output:
> Max TimeStamp : 1532934243136
> Max TimeStamp : 1532934243136
> Max TimeStamp : 1532934243144
> Max TimeStamp : 1532934243144
> Max TimeStamp : 1532934243152
> Max TimeStamp : 1532934243152
> Max TimeStamp : 1532934243160
> Max TimeStamp : 1532934243160
> Max TimeStamp : 1532934243168
> Max TimeStamp : 1532934243168
> Current WatreMark : 1532934240168
> Current WatreMark : -3000
> Current WatreMark : -3000
> Current WatreMark : 1532934240168
> Max TimeStamp : 1532934243176
> Max TimeStamp : 1532934243176
> Max TimeStamp : 1532934243184
> Max TimeStamp : 1532934243200
> Max TimeStamp : 1532934243208
> Max TimeStamp : 1532934243184
> 
> 



Flink wrong Watermark in Periodic watermark

2018-07-30 Thread Soheil Pourbafrani
Using Flink EventTime feature, I implement the class
AssignerWithPeriodicWatermark such that:

public static class SampleTimestampExtractor implements
AssignerWithPeriodicWatermarks> {
private static final long serialVersionUID = 1L;
private long MAX_TIMESTAMP;
private final long DELEY = 3000;


@Override
public long extractTimestamp(Tuple3 t, long l) {
long timestamp = t.f1 ;
MAX_TIMESTAMP =  Math.max(timestamp , MAX_TIMESTAMP);
System.out.println("Max TimeStamp : " + MAX_TIMESTAMP);
return timestamp ;
}

@Nullable
@Override
public Watermark getCurrentWatermark() {
System.out.println("Current WatreMark : " + (MAX_TIMESTAMP - DELEY));
return new Watermark(MAX_TIMESTAMP - DELEY);
}
}

In addition, I set the watermark interval to 100 milliseconds:

env.getConfig().setAutoWatermarkInterval(100);

But when I check the logs, some watermarks are -3000, so in
getCurrentWatermark method, it considers the MAX_TIMESTAMP zero (0 - 3000 =
-3000), while I can see in the logs that the MAX_TIMESTAMP has a value
greater than zero!
Here is a part of the output:
Max TimeStamp : 1532934243136
Max TimeStamp : 1532934243136
Max TimeStamp : 1532934243144
Max TimeStamp : 1532934243144
Max TimeStamp : 1532934243152
Max TimeStamp : 1532934243152
Max TimeStamp : 1532934243160
Max TimeStamp : 1532934243160
Max TimeStamp : 1532934243168
Max TimeStamp : 1532934243168
Current WatreMark : 1532934240168
Current WatreMark : -3000
Current WatreMark : -3000
Current WatreMark : 1532934240168
Max TimeStamp : 1532934243176
Max TimeStamp : 1532934243176
Max TimeStamp : 1532934243184
Max TimeStamp : 1532934243200
Max TimeStamp : 1532934243208
Max TimeStamp : 1532934243184