Is Hadoop 3.0 integration planned?

2018-03-20 Thread Jayant Ameta
Jayant Ameta


Re: Restart hook and checkpoint

2018-03-20 Thread Ashish Pokharel
I definitely like the idea of event based checkpointing :) 

Fabian, I do agree with your point that it is not possible to take a rescue 
checkpoint consistently. The basis here however is not around the operator that 
actually failed. It’s to avoid data loss across 100s (probably 1000s of 
parallel operators) which are being restarted and are “healthy”. We have 100k 
(nearing million soon) elements pushing data. Losing few seconds worth of data 
for few is not good but “acceptable” as long as damage can be controlled. Of 
course, we are going to use rocksdb + 2-phase commit with Kafka where we need 
exactly once guarantees. The proposal of “fine grain recovery 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures
 
”
 seems like a good start at least from damage control perspective but even with 
that it feels like something like “event based approach” can be done for a 
sub-set of job graph that are “healthy”. 

Thanks, Ashish


> On Mar 20, 2018, at 9:53 AM, Fabian Hueske  wrote:
> 
> Well, that's not that easy to do, because checkpoints must be coordinated and 
> triggered the JobManager.
> Also, the checkpointing mechanism with flowing checkpoint barriers (to ensure 
> checkpoint consistency) won't work once a task failed because it cannot 
> continue processing and forward barriers. If the task failed with an OOME, 
> the whole JVM is gone anyway.
> I don't think it is possible to take something like a consistent rescue 
> checkpoint in case of a failure. 
> 
> I might be possible to checkpoint application state of non-failed tasks, but 
> this would result in data loss for the failed task and we would need to weigh 
> the use cases for such a feature are the implementation effort.
> Maybe there are better ways to address such use cases.
> 
> Best, Fabian
> 
> 2018-03-20 6:43 GMT+01:00 makeyang  >:
> currently there is only time based way to trigger a checkpoint. based on this
> discussion, I think flink need to introduce event based way to trigger
> checkpoint such as restart a task manager should be count as a event.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
> 
> 



Re: CsvSink

2018-03-20 Thread karim amer
Never mind I found the error and has nothing to do with flink.
Sorry

On Tue, Mar 20, 2018 at 12:12 PM, karim amer 
wrote:

> here is the output after fixing the scala issues
>
> https://gist.github.com/karimamer/9e3bcf0a6d9110c01caa2ebd14aa7a8c
>
> On Tue, Mar 20, 2018 at 11:39 AM, karim amer 
> wrote:
>
>> Never mind after importing
>>
>> import org.apache.flink.api.scala._
>>
>> theses errors went away and i still have the original problem.
>> Sorry my bad
>>
>> On Tue, Mar 20, 2018 at 11:04 AM, karim amer 
>> wrote:
>>
>>> To clarify should i file a bug report on sbt hiding the errors in the
>>> previous email ?
>>>
>>> On Tue, Mar 20, 2018 at 9:44 AM, karim amer 
>>> wrote:
>>>
 After switching to Maven from Sbt I got these errors
 Error:(63, 37) could not find implicit value for evidence parameter of
 type org.apache.flink.api.common.typeinfo.TypeInformation[org.apa
 che.flink.quickstart.DataStreamtotableapi.Calls]
 val namedStream = dataStream.map((value:String) => {


 Error:(63, 37) not enough arguments for method map: (implicit
 evidence$7: org.apache.flink.api.common.ty
 peinfo.TypeInformation[org.apache.flink.quickstart.DataStrea
 mtotableapi.Calls])org.apache.flink.streaming.api.scala.Data
 Stream[org.apache.flink.quickstart.DataStreamtotableapi.Calls].
 Unspecified value parameter evidence$7.
 val namedStream = dataStream.map((value:String) => {


 Should i file a bug report  ?

 On Tue, Mar 20, 2018 at 9:30 AM, karim amer 
 wrote:

> Hi Fabian
> Sorry if i confused you The first error is from Nico's code Not my
> code or  snippet
> I am still having the original problem in my snippet where it's
> writing a blank csv file even though i get
> [success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM
> After running the job
>
> Cheers,
> karim
>
> On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske 
> wrote:
>
>> Hi Karim,
>>
>> I cannot find a method invocation 
>> "tableEnv.registerDataStream("myTable2",
>> set, 'A, 'B, 'C )" as shown in the error message in your example.
>> It would help if you would keep error message and code consistent.
>> Otherwise it's not possible to figure out what's going on.
>>
>> Best, Fabian
>>
>> 2018-03-20 0:24 GMT+01:00 karim amer :
>>
>>> Hi Nico,
>>>
>>> I tried to reproduce your code but registerDataStream keeps failing
>>> to register the fields even though i am following your code and the 
>>> Docs.
>>> here is the error
>>> [error]  found   : Symbol
>>> [error]  required: org.apache.flink.table.expressions.Expression
>>> [error] tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C
>>> )
>>> [error]
>>> I think my code snippet was misleading. Here is the full snippet
>>> Changing the name from table didn't fix it for
>>>
>>> import org.apache.flink.streaming.api.scala._
>>> import org.apache.flink.api.java.utils.ParameterTool
>>> import org.apache.flink.core.fs.FileSystem.WriteMode
>>> import org.apache.flink.streaming.api.{CheckpointingMode, 
>>> TimeCharacteristic}
>>> import 
>>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>> import org.apache.flink.streaming.api.windowing.time.Time
>>> import org.apache.flink.table.api.{Table, TableEnvironment}
>>> import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
>>> import org.apache.flink.types.Row
>>>
>>>
>>>
>>> object datastreamtotableapi {
>>>
>>>   case class Calls(a: String,
>>>b: String,
>>>c: String,
>>>d: String,
>>>e: String,
>>>f: String,
>>>g: String,
>>>h: String,
>>>i: String,
>>>j: String,
>>>k: String,
>>>l: String,
>>>m: String,
>>>n: String,
>>>p: String,
>>>q: String,
>>>r: String,
>>>s: String,
>>>t: String,
>>>v: String,
>>>w: String)
>>>
>>>
>>>   def main(args: Array[String]) {
>>>
>>> val params = ParameterTool.fromArgs(args)
>>> val input = params.getRequired("input")
>>>
>>>
>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>

Re: [ANNOUNCE] Weekly community update #12

2018-03-20 Thread Stephan Ewen
Great initiative, highly appreciated, Till!


On Mon, Mar 19, 2018 at 7:06 PM, Till Rohrmann  wrote:

> Dear community,
>
> I've noticed that Flink has grown quite a bit in the past. As a
> consequence it can be quite challenging to stay up to date. Especially for
> community members who don't follow Flink's MLs on a daily basis.
>
> In order to keep a bigger part of the community in the loop, I wanted to
> try out a weekly update letter where I update the community with what
> happened from my perspective. Since I also don't know everything I want to
> encourage others to post updates about things they deem important and
> relevant for the community to this thread.
>
> # Weekly update #12:
>
> ## Flink 1.5 release:
> - The Flink community is still working on the Flink 1.5 release. Hopefully
> Flink 1.5 can be released in the next weeks.
> - The main work concentrated last week on stabilizing Flip-6 and adding
> more automated tests [1]. The Flink community appreciates every helping
> hand with adding more end to end tests.
> - Consequently, the committed changes mainly consisted of bug fixes and
> test hardening.
> - By the end of this week, we hope to have a RC ready which can be used
> for easier release testing. Given the big changes (network stack and
> Flip-6), the RC will most likely still contain some rough edges. In order
> to smooth them out, it would be good if we run Flink 1.5 in as many
> different scenarios as possible.
>
> ## Flink 1.3.3. has been released
> - Flink 1.3.3 containing an important fix for properly handling
> checkpoints in case of a DFS problem has been released. We highly recommend
> that all users running Flink 1.3.2 upgrade swiftly to Flink 1.3.3.
>
> ## Misc:
> - Shuyi opened a discussion about improving Flink's security [2]. If you
> are interested and want to help with the next steps please engage in the
> discussion.
>
> PS: Don't worry that you've missed the first 11 weekly community updates.
> It's just this week's number.
>
> [1] http://apache-flink-mailing-list-archive.1008284.n3.
> nabble.com/ANNOUNCE-Flink-1-5-release-testing-effort-td21646.html
> [2] http://apache-flink-mailing-list-archive.1008284.
> n3.nabble.com/DISCUSS-Flink-security-improvements-td21068.html
>
> Cheers,
> Till
>


Re: Strange behavior on filter, group and reduce DataSets

2018-03-20 Thread Stephan Ewen
To diagnose that, can you please check the following:

  - Change the Person data type to be immutable (final fields, no setters,
set fields in constructor instead). Does that make the problem go away?

  - Change the Person data type to not be a POJO by adding a dummy fields
that is never used, but does not have a getter/setter. Does that make the
problem go away?

If either of that is the case, it must be a mutability bug somewhere in
either accidental object reuse or accidental serializer sharing.


On Tue, Mar 20, 2018 at 3:34 PM, Fabian Hueske  wrote:

> Hi Simone and Flavio,
>
> I created FLINK-9031 [1] for this issue.
> Please have a look and add any detail that you think could help to resolve
> the problem.
>
> Thanks,
> Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-9031
>
> 2018-03-19 16:35 GMT+01:00 simone :
>
>> Hi Fabian,
>>
>> This simple code reproduces the behavior ->
>> https://github.com/xseris/Flink-test-union
>>
>> Thanks, Simone.
>>
>> On 19/03/2018 15:44, Fabian Hueske wrote:
>>
>> Hmmm, I still don't see the problem.
>> IMO, the result should be correct for both plans. The data is replicated,
>> filtered, reduced, and unioned.
>> There is nothing in between the filter and reduce, that could cause
>> incorrect behavior.
>>
>> The good thing is, the optimizer seems to be fine. The bad thing is, it
>> is either the Flink runtime code or your functions.
>> Given that one plan produces good results, it might be the Flink runtime
>> code.
>>
>> Coming back to my previous question.
>> Can you provide a minimal program to reproduce the issue?
>>
>> Thanks, Fabian
>>
>> 2018-03-19 15:15 GMT+01:00 Fabian Hueske :
>>
>>> Ah, thanks for the update!
>>> I'll have a look at that.
>>>
>>> 2018-03-19 15:13 GMT+01:00 Fabian Hueske :
>>>
 HI Simone,

 Looking at the plan, I don't see why this should be happening. The
 pseudo code looks fine as well.
 Any chance that you can create a minimal program to reproduce the
 problem?

 Thanks,
 Fabian

 2018-03-19 12:04 GMT+01:00 simone :

> Hi Fabian,
>
> reuse is not enabled. I attach the plan of the execution.
>
> Thanks,
> Simone
>
> On 19/03/2018 11:36, Fabian Hueske wrote:
>
> Hi,
>
> Union is actually a very simple operator (not even an operator in
> Flink terms). It just merges to inputs. There is no additional logic
> involved.
> Therefore, it should also not emit records before either of both
> ReduceFunctions sorted its data.
> Once the data has been sorted for the ReduceFunction, the data is
> reduced and emitted in a pipelined fashion, i.e., once the first record is
> reduced, it is forwarded into the MapFunction (passing the unioned 
> inputs).
> So it is not unexpected that Map starts processing before the
> ReduceFunction terminated.
>
> Did you enable object reuse [1]?
> If yes, try to disable it. If you want to reuse objects, you have to
> be careful in how you implement your functions.
> If no, can you share the plan (ExecutionEnvironment.getExecutionPlan())
> that was generated for the program?
>
> Thanks,
> Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
> dev/batch/index.html#operating-on-data-objects-in-functions
>
>
>
> 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier :
>
>> Any help on this? This thing is very strange..the "manual" union of
>> the output of the 2 datasets is different than the flink-union of them..
>> Could it be a problem of the flink optimizer?
>>
>> Best,
>> Flavio
>>
>> On Fri, Mar 16, 2018 at 4:01 PM, simone 
>> wrote:
>>
>>> Sorry, I translated the code into pseudocode too fast. That is
>>> indeed an equals.
>>>
>>> On 16/03/2018 15:58, Kien Truong wrote:
>>>
>>> Hi,
>>>
>>> Just a guest, but string compare in Java should be using equals
>>> method, not == operator.
>>>
>>> Regards,
>>>
>>> Kien
>>>
>>>
>>> On 3/16/2018 9:47 PM, simone wrote:
>>>
>>> *subject.getField("field1") == "";*
>>>
>>>
>>>
>>
>>
>
>

>>>
>>
>>
>


Re: CsvSink

2018-03-20 Thread karim amer
here is the output after fixing the scala issues

https://gist.github.com/karimamer/9e3bcf0a6d9110c01caa2ebd14aa7a8c

On Tue, Mar 20, 2018 at 11:39 AM, karim amer 
wrote:

> Never mind after importing
>
> import org.apache.flink.api.scala._
>
> theses errors went away and i still have the original problem.
> Sorry my bad
>
> On Tue, Mar 20, 2018 at 11:04 AM, karim amer 
> wrote:
>
>> To clarify should i file a bug report on sbt hiding the errors in the
>> previous email ?
>>
>> On Tue, Mar 20, 2018 at 9:44 AM, karim amer 
>> wrote:
>>
>>> After switching to Maven from Sbt I got these errors
>>> Error:(63, 37) could not find implicit value for evidence parameter of
>>> type org.apache.flink.api.common.typeinfo.TypeInformation[org.apa
>>> che.flink.quickstart.DataStreamtotableapi.Calls]
>>> val namedStream = dataStream.map((value:String) => {
>>>
>>>
>>> Error:(63, 37) not enough arguments for method map: (implicit
>>> evidence$7: org.apache.flink.api.common.typeinfo.TypeInformation[org.apa
>>> che.flink.quickstart.DataStreamtotableapi.Calls])org.apache.
>>> flink.streaming.api.scala.DataStream[org.apache.flink.quicks
>>> tart.DataStreamtotableapi.Calls].
>>> Unspecified value parameter evidence$7.
>>> val namedStream = dataStream.map((value:String) => {
>>>
>>>
>>> Should i file a bug report  ?
>>>
>>> On Tue, Mar 20, 2018 at 9:30 AM, karim amer 
>>> wrote:
>>>
 Hi Fabian
 Sorry if i confused you The first error is from Nico's code Not my code
 or  snippet
 I am still having the original problem in my snippet where it's writing
 a blank csv file even though i get
 [success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM
 After running the job

 Cheers,
 karim

 On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske 
 wrote:

> Hi Karim,
>
> I cannot find a method invocation "tableEnv.registerDataStream("myTable2",
> set, 'A, 'B, 'C )" as shown in the error message in your example.
> It would help if you would keep error message and code consistent.
> Otherwise it's not possible to figure out what's going on.
>
> Best, Fabian
>
> 2018-03-20 0:24 GMT+01:00 karim amer :
>
>> Hi Nico,
>>
>> I tried to reproduce your code but registerDataStream keeps failing
>> to register the fields even though i am following your code and the Docs.
>> here is the error
>> [error]  found   : Symbol
>> [error]  required: org.apache.flink.table.expressions.Expression
>> [error] tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
>> [error]
>> I think my code snippet was misleading. Here is the full snippet
>> Changing the name from table didn't fix it for
>>
>> import org.apache.flink.streaming.api.scala._
>> import org.apache.flink.api.java.utils.ParameterTool
>> import org.apache.flink.core.fs.FileSystem.WriteMode
>> import org.apache.flink.streaming.api.{CheckpointingMode, 
>> TimeCharacteristic}
>> import 
>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>> import org.apache.flink.streaming.api.windowing.time.Time
>> import org.apache.flink.table.api.{Table, TableEnvironment}
>> import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
>> import org.apache.flink.types.Row
>>
>>
>>
>> object datastreamtotableapi {
>>
>>   case class Calls(a: String,
>>b: String,
>>c: String,
>>d: String,
>>e: String,
>>f: String,
>>g: String,
>>h: String,
>>i: String,
>>j: String,
>>k: String,
>>l: String,
>>m: String,
>>n: String,
>>p: String,
>>q: String,
>>r: String,
>>s: String,
>>t: String,
>>v: String,
>>w: String)
>>
>>
>>   def main(args: Array[String]) {
>>
>> val params = ParameterTool.fromArgs(args)
>> val input = params.getRequired("input")
>>
>>
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> env.setParallelism(1)
>> val tableEnv = TableEnvironment.getTableEnvironment(env)
>>
>> val dataStream = env.readTextFile(input)
>>
>> val namedStream = dataStream.map((value:String) => {
>>

Re: CsvSink

2018-03-20 Thread karim amer
Never mind after importing

import org.apache.flink.api.scala._

theses errors went away and i still have the original problem.
Sorry my bad

On Tue, Mar 20, 2018 at 11:04 AM, karim amer 
wrote:

> To clarify should i file a bug report on sbt hiding the errors in the
> previous email ?
>
> On Tue, Mar 20, 2018 at 9:44 AM, karim amer 
> wrote:
>
>> After switching to Maven from Sbt I got these errors
>> Error:(63, 37) could not find implicit value for evidence parameter of
>> type org.apache.flink.api.common.typeinfo.TypeInformation[org.apa
>> che.flink.quickstart.DataStreamtotableapi.Calls]
>> val namedStream = dataStream.map((value:String) => {
>>
>>
>> Error:(63, 37) not enough arguments for method map: (implicit evidence$7:
>> org.apache.flink.api.common.typeinfo.TypeInformation[org.apa
>> che.flink.quickstart.DataStreamtotableapi.Calls])org.apache.
>> flink.streaming.api.scala.DataStream[org.apache.flink.
>> quickstart.DataStreamtotableapi.Calls].
>> Unspecified value parameter evidence$7.
>> val namedStream = dataStream.map((value:String) => {
>>
>>
>> Should i file a bug report  ?
>>
>> On Tue, Mar 20, 2018 at 9:30 AM, karim amer 
>> wrote:
>>
>>> Hi Fabian
>>> Sorry if i confused you The first error is from Nico's code Not my code
>>> or  snippet
>>> I am still having the original problem in my snippet where it's writing
>>> a blank csv file even though i get
>>> [success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM
>>> After running the job
>>>
>>> Cheers,
>>> karim
>>>
>>> On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske 
>>> wrote:
>>>
 Hi Karim,

 I cannot find a method invocation "tableEnv.registerDataStream("myTable2",
 set, 'A, 'B, 'C )" as shown in the error message in your example.
 It would help if you would keep error message and code consistent.
 Otherwise it's not possible to figure out what's going on.

 Best, Fabian

 2018-03-20 0:24 GMT+01:00 karim amer :

> Hi Nico,
>
> I tried to reproduce your code but registerDataStream keeps failing
> to register the fields even though i am following your code and the Docs.
> here is the error
> [error]  found   : Symbol
> [error]  required: org.apache.flink.table.expressions.Expression
> [error] tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
> [error]
> I think my code snippet was misleading. Here is the full snippet
> Changing the name from table didn't fix it for
>
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.api.java.utils.ParameterTool
> import org.apache.flink.core.fs.FileSystem.WriteMode
> import org.apache.flink.streaming.api.{CheckpointingMode, 
> TimeCharacteristic}
> import 
> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.table.api.{Table, TableEnvironment}
> import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
> import org.apache.flink.types.Row
>
>
>
> object datastreamtotableapi {
>
>   case class Calls(a: String,
>b: String,
>c: String,
>d: String,
>e: String,
>f: String,
>g: String,
>h: String,
>i: String,
>j: String,
>k: String,
>l: String,
>m: String,
>n: String,
>p: String,
>q: String,
>r: String,
>s: String,
>t: String,
>v: String,
>w: String)
>
>
>   def main(args: Array[String]) {
>
> val params = ParameterTool.fromArgs(args)
> val input = params.getRequired("input")
>
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> env.setParallelism(1)
> val tableEnv = TableEnvironment.getTableEnvironment(env)
>
> val dataStream = env.readTextFile(input)
>
> val namedStream = dataStream.map((value:String) => {
>
>   val columns = value.split(",")
>   Calls(columns(0), columns(1),columns(2),columns(3), 
> columns(4),columns(5),
> columns(6), columns(7),columns(8),columns(9), columns(10), 
> columns(11),
> columns(12), columns(13),columns(14),columns(15), columns(16), 
> columns(17),
>  

Re: CsvSink

2018-03-20 Thread karim amer
To clarify should i file a bug report on sbt hiding the errors in the
previous email ?

On Tue, Mar 20, 2018 at 9:44 AM, karim amer  wrote:

> After switching to Maven from Sbt I got these errors
> Error:(63, 37) could not find implicit value for evidence parameter of
> type org.apache.flink.api.common.typeinfo.TypeInformation[org.
> apache.flink.quickstart.DataStreamtotableapi.Calls]
> val namedStream = dataStream.map((value:String) => {
>
>
> Error:(63, 37) not enough arguments for method map: (implicit evidence$7:
> org.apache.flink.api.common.typeinfo.TypeInformation[org.
> apache.flink.quickstart.DataStreamtotableapi.Calls])
> org.apache.flink.streaming.api.scala.DataStream[org.
> apache.flink.quickstart.DataStreamtotableapi.Calls].
> Unspecified value parameter evidence$7.
> val namedStream = dataStream.map((value:String) => {
>
>
> Should i file a bug report  ?
>
> On Tue, Mar 20, 2018 at 9:30 AM, karim amer 
> wrote:
>
>> Hi Fabian
>> Sorry if i confused you The first error is from Nico's code Not my code
>> or  snippet
>> I am still having the original problem in my snippet where it's writing a
>> blank csv file even though i get
>> [success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM
>> After running the job
>>
>> Cheers,
>> karim
>>
>> On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske  wrote:
>>
>>> Hi Karim,
>>>
>>> I cannot find a method invocation "tableEnv.registerDataStream("myTable2",
>>> set, 'A, 'B, 'C )" as shown in the error message in your example.
>>> It would help if you would keep error message and code consistent.
>>> Otherwise it's not possible to figure out what's going on.
>>>
>>> Best, Fabian
>>>
>>> 2018-03-20 0:24 GMT+01:00 karim amer :
>>>
 Hi Nico,

 I tried to reproduce your code but registerDataStream keeps failing to
 register the fields even though i am following your code and the Docs.
 here is the error
 [error]  found   : Symbol
 [error]  required: org.apache.flink.table.expressions.Expression
 [error] tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
 [error]
 I think my code snippet was misleading. Here is the full snippet
 Changing the name from table didn't fix it for

 import org.apache.flink.streaming.api.scala._
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.streaming.api.{CheckpointingMode, 
 TimeCharacteristic}
 import 
 org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.table.api.{Table, TableEnvironment}
 import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
 import org.apache.flink.types.Row



 object datastreamtotableapi {

   case class Calls(a: String,
b: String,
c: String,
d: String,
e: String,
f: String,
g: String,
h: String,
i: String,
j: String,
k: String,
l: String,
m: String,
n: String,
p: String,
q: String,
r: String,
s: String,
t: String,
v: String,
w: String)


   def main(args: Array[String]) {

 val params = ParameterTool.fromArgs(args)
 val input = params.getRequired("input")


 val env = StreamExecutionEnvironment.getExecutionEnvironment
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 env.setParallelism(1)
 val tableEnv = TableEnvironment.getTableEnvironment(env)

 val dataStream = env.readTextFile(input)

 val namedStream = dataStream.map((value:String) => {

   val columns = value.split(",")
   Calls(columns(0), columns(1),columns(2),columns(3), 
 columns(4),columns(5),
 columns(6), columns(7),columns(8),columns(9), columns(10), 
 columns(11),
 columns(12), columns(13),columns(14),columns(15), columns(16), 
 columns(17),
 columns(18),columns(19), columns(20)
   )
 })


val cleanedStream =  namedStream.filter(_.j == " ").filter(_.k==" ")

val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new 
 BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
  override def 

Re: CsvSink

2018-03-20 Thread karim amer
After switching to Maven from Sbt I got these errors
Error:(63, 37) could not find implicit value for evidence parameter of type
org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.quickstart.DataStreamtotableapi.Calls]
val namedStream = dataStream.map((value:String) => {


Error:(63, 37) not enough arguments for method map: (implicit evidence$7:
org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.quickstart.DataStreamtotableapi.Calls])org.apache.flink.streaming.api.scala.DataStream[org.apache.flink.quickstart.DataStreamtotableapi.Calls].
Unspecified value parameter evidence$7.
val namedStream = dataStream.map((value:String) => {


Should i file a bug report  ?

On Tue, Mar 20, 2018 at 9:30 AM, karim amer  wrote:

> Hi Fabian
> Sorry if i confused you The first error is from Nico's code Not my code
> or  snippet
> I am still having the original problem in my snippet where it's writing a
> blank csv file even though i get
> [success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM
> After running the job
>
> Cheers,
> karim
>
> On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske  wrote:
>
>> Hi Karim,
>>
>> I cannot find a method invocation "tableEnv.registerDataStream("myTable2",
>> set, 'A, 'B, 'C )" as shown in the error message in your example.
>> It would help if you would keep error message and code consistent.
>> Otherwise it's not possible to figure out what's going on.
>>
>> Best, Fabian
>>
>> 2018-03-20 0:24 GMT+01:00 karim amer :
>>
>>> Hi Nico,
>>>
>>> I tried to reproduce your code but registerDataStream keeps failing to
>>> register the fields even though i am following your code and the Docs.
>>> here is the error
>>> [error]  found   : Symbol
>>> [error]  required: org.apache.flink.table.expressions.Expression
>>> [error] tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
>>> [error]
>>> I think my code snippet was misleading. Here is the full snippet
>>> Changing the name from table didn't fix it for
>>>
>>> import org.apache.flink.streaming.api.scala._
>>> import org.apache.flink.api.java.utils.ParameterTool
>>> import org.apache.flink.core.fs.FileSystem.WriteMode
>>> import org.apache.flink.streaming.api.{CheckpointingMode, 
>>> TimeCharacteristic}
>>> import 
>>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>> import org.apache.flink.streaming.api.windowing.time.Time
>>> import org.apache.flink.table.api.{Table, TableEnvironment}
>>> import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
>>> import org.apache.flink.types.Row
>>>
>>>
>>>
>>> object datastreamtotableapi {
>>>
>>>   case class Calls(a: String,
>>>b: String,
>>>c: String,
>>>d: String,
>>>e: String,
>>>f: String,
>>>g: String,
>>>h: String,
>>>i: String,
>>>j: String,
>>>k: String,
>>>l: String,
>>>m: String,
>>>n: String,
>>>p: String,
>>>q: String,
>>>r: String,
>>>s: String,
>>>t: String,
>>>v: String,
>>>w: String)
>>>
>>>
>>>   def main(args: Array[String]) {
>>>
>>> val params = ParameterTool.fromArgs(args)
>>> val input = params.getRequired("input")
>>>
>>>
>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>> env.setParallelism(1)
>>> val tableEnv = TableEnvironment.getTableEnvironment(env)
>>>
>>> val dataStream = env.readTextFile(input)
>>>
>>> val namedStream = dataStream.map((value:String) => {
>>>
>>>   val columns = value.split(",")
>>>   Calls(columns(0), columns(1),columns(2),columns(3), 
>>> columns(4),columns(5),
>>> columns(6), columns(7),columns(8),columns(9), columns(10), 
>>> columns(11),
>>> columns(12), columns(13),columns(14),columns(15), columns(16), 
>>> columns(17),
>>> columns(18),columns(19), columns(20)
>>>   )
>>> })
>>>
>>>
>>>val cleanedStream =  namedStream.filter(_.j == " ").filter(_.k==" ")
>>>
>>>val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new 
>>> BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
>>>  override def extractTimestamp(element: Calls): Long = 
>>> (element.j.concat(element.k)).toLong
>>>})
>>>
>>>
>>>
>>> tableEnv.registerDataStream("CDRS", watermarkedStream)
>>> val results = tableEnv.sqlQuery( """
>>>   |SELECT
>>>   | a
>>>   

Re: CsvSink

2018-03-20 Thread karim amer
Hi Fabian
Sorry if i confused you The first error is from Nico's code Not my code or
snippet
I am still having the original problem in my snippet where it's writing a
blank csv file even though i get
[success] Total time: 26 s, completed Mar 20, 2018 9:28:06 AM
After running the job

Cheers,
karim

On Tue, Mar 20, 2018 at 6:25 AM, Fabian Hueske  wrote:

> Hi Karim,
>
> I cannot find a method invocation "tableEnv.registerDataStream("myTable2",
> set, 'A, 'B, 'C )" as shown in the error message in your example.
> It would help if you would keep error message and code consistent.
> Otherwise it's not possible to figure out what's going on.
>
> Best, Fabian
>
> 2018-03-20 0:24 GMT+01:00 karim amer :
>
>> Hi Nico,
>>
>> I tried to reproduce your code but registerDataStream keeps failing to
>> register the fields even though i am following your code and the Docs.
>> here is the error
>> [error]  found   : Symbol
>> [error]  required: org.apache.flink.table.expressions.Expression
>> [error] tableEnv.registerDataStream("myTable2", set, 'A, 'B, 'C )
>> [error]
>> I think my code snippet was misleading. Here is the full snippet Changing
>> the name from table didn't fix it for
>>
>> import org.apache.flink.streaming.api.scala._
>> import org.apache.flink.api.java.utils.ParameterTool
>> import org.apache.flink.core.fs.FileSystem.WriteMode
>> import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
>> import 
>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>> import org.apache.flink.streaming.api.windowing.time.Time
>> import org.apache.flink.table.api.{Table, TableEnvironment}
>> import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
>> import org.apache.flink.types.Row
>>
>>
>>
>> object datastreamtotableapi {
>>
>>   case class Calls(a: String,
>>b: String,
>>c: String,
>>d: String,
>>e: String,
>>f: String,
>>g: String,
>>h: String,
>>i: String,
>>j: String,
>>k: String,
>>l: String,
>>m: String,
>>n: String,
>>p: String,
>>q: String,
>>r: String,
>>s: String,
>>t: String,
>>v: String,
>>w: String)
>>
>>
>>   def main(args: Array[String]) {
>>
>> val params = ParameterTool.fromArgs(args)
>> val input = params.getRequired("input")
>>
>>
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> env.setParallelism(1)
>> val tableEnv = TableEnvironment.getTableEnvironment(env)
>>
>> val dataStream = env.readTextFile(input)
>>
>> val namedStream = dataStream.map((value:String) => {
>>
>>   val columns = value.split(",")
>>   Calls(columns(0), columns(1),columns(2),columns(3), 
>> columns(4),columns(5),
>> columns(6), columns(7),columns(8),columns(9), columns(10), 
>> columns(11),
>> columns(12), columns(13),columns(14),columns(15), columns(16), 
>> columns(17),
>> columns(18),columns(19), columns(20)
>>   )
>> })
>>
>>
>>val cleanedStream =  namedStream.filter(_.j == " ").filter(_.k==" ")
>>
>>val watermarkedStream = cleanedStream.assignTimestampsAndWatermarks(new 
>> BoundedOutOfOrdernessTimestampExtractor[Calls](Time.seconds(100)) {
>>  override def extractTimestamp(element: Calls): Long = 
>> (element.j.concat(element.k)).toLong
>>})
>>
>>
>>
>> tableEnv.registerDataStream("CDRS", watermarkedStream)
>> val results = tableEnv.sqlQuery( """
>>   |SELECT
>>   | a
>>   | FROM CDRS
>>""".stripMargin)
>>
>>
>> val result: Table = results
>>
>> val path = "file:///Users/test/1.txt"
>> val sink :TableSink[Row]=   new CsvTableSink(
>>   path, // output path
>>   fieldDelim = "|", // optional: delimit files by '|'
>>   numFiles = 1, // optional: write to a single file
>>   writeMode = WriteMode.OVERWRITE)
>>
>> result.writeToSink(sink)
>>
>>
>> env.execute("this job")
>>
>>   }
>> }
>>
>>
>>
>>
>> On Mon, Mar 19, 2018 at 9:13 AM, Nico Kruber 
>> wrote:
>>
>>> Hi Karim,
>>> when I was trying to reproduce your code, I got an exception with the
>>> name 'table' being used - by replacing it and completing the job with
>>> some input, I did see the csv file popping up. Also, the job was
>>> crashing when the 

Re: Flink CEP window for 1 working day

2018-03-20 Thread shishal
Thanks Fabian,

So by non working day, I mean,  I have a list of non working day in a year,
which I can use to compare.
I am very new to Flink and Flick CEP. Initially I thought there is a way to
have within(time) value expression dynamically. So now I guess that's not
possible. 

If I understand correctly, the other way around is to somehow manipulated
stream (probably using ProcessFunction), and may be manipulate event time.

Can you please help me with more pointer to how to go ahead with this.
Please note that I am using event time processing. 

Your help is much appreciated.
Thanks



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink CEP window for 1 working day

2018-03-20 Thread Fabian Hueske
Hi,

I'm afraid, Flink CEP does not distinguish work days from non-work days.
Of course, you could implement the logic in a DataStream program (probably
using ProcessFunction).

Best, Fabian

2018-03-20 15:44 GMT+01:00 shishal :

> I am using flink CEP , and to match a event pattern in given time window we
> use *.within(Time.days(1))*
>
> Now in one of the case I need to wait for 1 working day instead of 1 day.
> Is
> there any way to do that in Flink CEP?
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Flink CEP window for 1 working day

2018-03-20 Thread shishal
I am using flink CEP , and to match a event pattern in given time window we
use *.within(Time.days(1))*

Now in one of the case I need to wait for 1 working day instead of 1 day. Is
there any way to do that in Flink CEP?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Strange behavior on filter, group and reduce DataSets

2018-03-20 Thread Fabian Hueske
Hi Simone and Flavio,

I created FLINK-9031 [1] for this issue.
Please have a look and add any detail that you think could help to resolve
the problem.

Thanks,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-9031

2018-03-19 16:35 GMT+01:00 simone :

> Hi Fabian,
>
> This simple code reproduces the behavior -> https://github.com/xseris/
> Flink-test-union
>
> Thanks, Simone.
>
> On 19/03/2018 15:44, Fabian Hueske wrote:
>
> Hmmm, I still don't see the problem.
> IMO, the result should be correct for both plans. The data is replicated,
> filtered, reduced, and unioned.
> There is nothing in between the filter and reduce, that could cause
> incorrect behavior.
>
> The good thing is, the optimizer seems to be fine. The bad thing is, it is
> either the Flink runtime code or your functions.
> Given that one plan produces good results, it might be the Flink runtime
> code.
>
> Coming back to my previous question.
> Can you provide a minimal program to reproduce the issue?
>
> Thanks, Fabian
>
> 2018-03-19 15:15 GMT+01:00 Fabian Hueske :
>
>> Ah, thanks for the update!
>> I'll have a look at that.
>>
>> 2018-03-19 15:13 GMT+01:00 Fabian Hueske :
>>
>>> HI Simone,
>>>
>>> Looking at the plan, I don't see why this should be happening. The
>>> pseudo code looks fine as well.
>>> Any chance that you can create a minimal program to reproduce the
>>> problem?
>>>
>>> Thanks,
>>> Fabian
>>>
>>> 2018-03-19 12:04 GMT+01:00 simone :
>>>
 Hi Fabian,

 reuse is not enabled. I attach the plan of the execution.

 Thanks,
 Simone

 On 19/03/2018 11:36, Fabian Hueske wrote:

 Hi,

 Union is actually a very simple operator (not even an operator in Flink
 terms). It just merges to inputs. There is no additional logic involved.
 Therefore, it should also not emit records before either of both
 ReduceFunctions sorted its data.
 Once the data has been sorted for the ReduceFunction, the data is
 reduced and emitted in a pipelined fashion, i.e., once the first record is
 reduced, it is forwarded into the MapFunction (passing the unioned inputs).
 So it is not unexpected that Map starts processing before the
 ReduceFunction terminated.

 Did you enable object reuse [1]?
 If yes, try to disable it. If you want to reuse objects, you have to be
 careful in how you implement your functions.
 If no, can you share the plan (ExecutionEnvironment.getExecutionPlan())
 that was generated for the program?

 Thanks,
 Fabian

 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
 dev/batch/index.html#operating-on-data-objects-in-functions



 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier :

> Any help on this? This thing is very strange..the "manual" union of
> the output of the 2 datasets is different than the flink-union of them..
> Could it be a problem of the flink optimizer?
>
> Best,
> Flavio
>
> On Fri, Mar 16, 2018 at 4:01 PM, simone 
> wrote:
>
>> Sorry, I translated the code into pseudocode too fast. That is indeed
>> an equals.
>>
>> On 16/03/2018 15:58, Kien Truong wrote:
>>
>> Hi,
>>
>> Just a guest, but string compare in Java should be using equals
>> method, not == operator.
>>
>> Regards,
>>
>> Kien
>>
>>
>> On 3/16/2018 9:47 PM, simone wrote:
>>
>> *subject.getField("field1") == "";*
>>
>>
>>
>
>


>>>
>>
>
>


Re: Metric Registry Warnings

2018-03-20 Thread PedroMrChaves
I have multiple sources but with distinct names and UIDs.

More information about my execution environment:

Flink Version: 1.4.2 bundled with hadoop 2.8
State backend: Hadoop 2.8
Job compiled for version 1.4.2 using the Scala version libs from Scala
version 2.11. 

Am using the com.github.davidb to export the metrics to influxdb as
exemplified here:
https://github.com/jgrier/flink-stuff/tree/master/flink-influx-reporter but
recompiled for version 1.4.2.



-
Best Regards,
Pedro Chaves
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Metric Registry Warnings

2018-03-20 Thread Chesnay Schepler
FLINK-7100 is about taskmanager metrics being registered twice, whereas 
here we're dealing with job metrics.


Do you have multiple sources? If so, do they have unique names?

On 20.03.2018 15:06, Fabian Hueske wrote:

Hi Pedro,

Can you reopen FLINK-7100 and post a comment with your error message 
and environment?


Thanks,
Fabian

2018-03-20 14:58 GMT+01:00 PedroMrChaves >:


Hello,

I still have the same issue with Flink Version 1.4.2.

java.lang.IllegalArgumentException: A metric named
.taskmanager.6aa8d13575228d38ae4abdfb37fa229e.CDC.Source:
EVENTS.1.numRecordsIn already exists
at
com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:91)
at

org.apache.flink.dropwizard.ScheduledDropwizardReporter.notifyOfAddedMetric(ScheduledDropwizardReporter.java:131)
at

org.apache.flink.runtime.metrics.MetricRegistryImpl.register(MetricRegistryImpl.java:319)
at

org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:370)
at

org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:303)
at

org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:293)
at

org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup.(OperatorIOMetricGroup.java:40)
at

org.apache.flink.runtime.metrics.groups.OperatorMetricGroup.(OperatorMetricGroup.java:48)
at

org.apache.flink.runtime.metrics.groups.TaskMetricGroup.addOperator(TaskMetricGroup.java:146)
at

org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:182)
at

org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setup(AbstractUdfStreamOperator.java:82)
at

org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:136)
at

org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)



-
Best Regards,
Pedro Chaves
--
Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/







Re: Help Required for Log4J

2018-03-20 Thread Puneet Kinra
Hi

Fabin thanks for reply I fixed the issue that i was facing.

On Tue, Mar 20, 2018 at 7:31 PM, Fabian Hueske  wrote:

> Hi,
>
> TBH, I don't have much experience with logging, but you might want to
> consider using Side Outputs [1] to route invalid records into a separate
> stream.
> The stream can then separately handled, be written to files or Kafka or
> wherever.
>
> Best,
> Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/stream/side_output.html
>
> 2018-03-20 10:36 GMT+01:00 Puneet Kinra 
> :
>
>> Hi
>>
>> I have a use case in which i want to log bad records in the log file. I
>> have configured the log4j
>> property file is getting generated as well but it also going to flink
>> logs as well i want to detach
>> it from flink logs want to write to log file.
>>
>> .Here is configuration
>> *(Note :AMSSource is the custom written adaptor here)*
>>
>> # This affects logging for both user code and Flink
>> log4j.rootLogger=INFO, file
>> log4j.logger.amssource=DEBUG, amssourceAppender
>>
>> # Uncomment this if you want to _only_ change Flink's logging
>> #log4j.logger.org.apache.flink=INFO
>>
>> # The following lines keep the log level of common libraries/connectors on
>> # log level INFO. The root logger does not override this. You have to
>> manually
>> # change the log levels here.
>> log4j.logger.akka=INFO
>> log4j.logger.org.apache.kafka=INFO
>> log4j.logger.org.apache.hadoop=INFO
>> log4j.logger.org.apache.zookeeper=INFO
>>
>> # Log all infos in the given file
>> log4j.appender.file=org.apache.log4j.FileAppender
>> log4j.appender.file.file=D:\\logs\\flink-log
>> log4j.appender.file.append=false
>> log4j.appender.file.layout=org.apache.log4j.PatternLayout
>> log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS}
>> %-5p %-60c %x - %m%n
>>
>> # Suppress the irrelevant (wrong) warnings from the Netty channel handler
>> log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.ch
>> annel.DefaultChannelPipeline=ERROR, file
>>
>>
>> #BonusPointAppender
>> log4j.appender.bonuspointAppender=org.apache.log4j.RollingFileAppender
>> log4j.appender.bonuspointAppender.MaxFileSize=1024MB
>> log4j.appender.bonuspointAppender.MaxBackupIndex=10
>> log4j.appender.bonuspointAppender.Append=true
>> log4j.appender.bonuspointAppender.File=D:\\logs\\flink-bpuser-bonus.logs
>> #log4j.appender.bonuspointAppender.DatePattern='.'-MM-dd
>> log4j.appender.bonuspointAppender.layout=org.apache.log4j.PatternLayout
>> log4j.appender.bonuspointAppender.layout.ConversionPattern=%d [%t] %-5p
>> (%C %M:%L) %x - %m%n
>>
>> #AMSSourceAppender
>> log4j.appender.amssourceAppender=org.apache.log4j.RollingFileAppender
>> log4j.appender.amssourceAppender.MaxFileSize=1024MB
>> log4j.appender.amssourceAppender.MaxBackupIndex=10
>> log4j.appender.amssourceAppender.Append=true
>> log4j.appender.amssourceAppender.File=D:\\logs\\flink-bpuser
>> -bonus-amssource.logs
>> #log4j.appender.amssourceAppender.DatePattern='.'-MM-dd
>> log4j.appender.amssourceAppender.layout=org.apache.log4j.PatternLayout
>> log4j.appender.amssourceAppender.layout.ConversionPattern=%d [%t] %-5p
>> (%C %M:%L) %x - %m%n
>>
>>
>>
>>
>> --
>> *Cheers *
>>
>> *Puneet Kinra*
>>
>> *Mobile:+918800167808 <+91%2088001%2067808> | Skype :
>> puneet.ki...@customercentria.com *
>>
>> *e-mail :puneet.ki...@customercentria.com
>> *
>>
>>
>>
>


-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
*

*e-mail :puneet.ki...@customercentria.com
*


Re: Error while reporting metrics - ConcorrentModificationException

2018-03-20 Thread Chesnay Schepler
A wrapped Kafka metric was accessing state of the consumer while said 
state was modified.


As far as I can tell this is a Kafka issue and there's nothing we can do.

Unless this happens frequently it should be safe to ignore it.

On 20.03.2018 15:02, PedroMrChaves wrote:

Hello,

I have the following error while trying to report metrics to influxdb using
the DropwizardReporter.

2018-03-20 13:51:00,288 WARN
org.apache.flink.runtime.metrics.MetricRegistryImpl   - Error while
reporting metrics
java.util.ConcurrentModificationException
at
java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
at 
java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
at java.util.HashSet.(HashSet.java:120)
at
org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:65)
at
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:298)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:906)
at 
org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
at 
org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
at
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:35)
at
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:26)
at
org.apache.flink.dropwizard.metrics.FlinkGaugeWrapper.getValue(FlinkGaugeWrapper.java:36)
at
metrics_influxdb.measurements.MeasurementReporter.fromGauge(MeasurementReporter.java:163)
at
metrics_influxdb.measurements.MeasurementReporter.report(MeasurementReporter.java:55)
at
org.apache.flink.dropwizard.ScheduledDropwizardReporter.report(ScheduledDropwizardReporter.java:231)
at
org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:417)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Any ideas on what might be the problem?




-
Best Regards,
Pedro Chaves
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Metric Registry Warnings

2018-03-20 Thread Fabian Hueske
Hi Pedro,

Can you reopen FLINK-7100 and post a comment with your error message and
environment?

Thanks,
Fabian

2018-03-20 14:58 GMT+01:00 PedroMrChaves :

> Hello,
>
> I still have the same issue with Flink Version 1.4.2.
>
> java.lang.IllegalArgumentException: A metric named
> .taskmanager.6aa8d13575228d38ae4abdfb37fa229e.CDC.Source:
> EVENTS.1.numRecordsIn already exists
> at
> com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:91)
> at
> org.apache.flink.dropwizard.ScheduledDropwizardReporter.
> notifyOfAddedMetric(ScheduledDropwizardReporter.java:131)
> at
> org.apache.flink.runtime.metrics.MetricRegistryImpl.
> register(MetricRegistryImpl.java:319)
> at
> org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.addMetric(
> AbstractMetricGroup.java:370)
> at
> org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.counter(
> AbstractMetricGroup.java:303)
> at
> org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.counter(
> AbstractMetricGroup.java:293)
> at
> org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup.(
> OperatorIOMetricGroup.java:40)
> at
> org.apache.flink.runtime.metrics.groups.OperatorMetricGroup.(
> OperatorMetricGroup.java:48)
> at
> org.apache.flink.runtime.metrics.groups.TaskMetricGroup.addOperator(
> TaskMetricGroup.java:146)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(
> AbstractStreamOperator.java:182)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setup(
> AbstractUdfStreamOperator.java:82)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<
> init>(OperatorChain.java:136)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:231)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
>
>
>
> -
> Best Regards,
> Pedro Chaves
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Help Required for Log4J

2018-03-20 Thread Fabian Hueske
Hi,

TBH, I don't have much experience with logging, but you might want to
consider using Side Outputs [1] to route invalid records into a separate
stream.
The stream can then separately handled, be written to files or Kafka or
wherever.

Best,
Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/side_output.html

2018-03-20 10:36 GMT+01:00 Puneet Kinra :

> Hi
>
> I have a use case in which i want to log bad records in the log file. I
> have configured the log4j
> property file is getting generated as well but it also going to flink logs
> as well i want to detach
> it from flink logs want to write to log file.
>
> .Here is configuration
> *(Note :AMSSource is the custom written adaptor here)*
>
> # This affects logging for both user code and Flink
> log4j.rootLogger=INFO, file
> log4j.logger.amssource=DEBUG, amssourceAppender
>
> # Uncomment this if you want to _only_ change Flink's logging
> #log4j.logger.org.apache.flink=INFO
>
> # The following lines keep the log level of common libraries/connectors on
> # log level INFO. The root logger does not override this. You have to
> manually
> # change the log levels here.
> log4j.logger.akka=INFO
> log4j.logger.org.apache.kafka=INFO
> log4j.logger.org.apache.hadoop=INFO
> log4j.logger.org.apache.zookeeper=INFO
>
> # Log all infos in the given file
> log4j.appender.file=org.apache.log4j.FileAppender
> log4j.appender.file.file=D:\\logs\\flink-log
> log4j.appender.file.append=false
> log4j.appender.file.layout=org.apache.log4j.PatternLayout
> log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS}
> %-5p %-60c %x - %m%n
>
> # Suppress the irrelevant (wrong) warnings from the Netty channel handler
> log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.ch
> annel.DefaultChannelPipeline=ERROR, file
>
>
> #BonusPointAppender
> log4j.appender.bonuspointAppender=org.apache.log4j.RollingFileAppender
> log4j.appender.bonuspointAppender.MaxFileSize=1024MB
> log4j.appender.bonuspointAppender.MaxBackupIndex=10
> log4j.appender.bonuspointAppender.Append=true
> log4j.appender.bonuspointAppender.File=D:\\logs\\flink-bpuser-bonus.logs
> #log4j.appender.bonuspointAppender.DatePattern='.'-MM-dd
> log4j.appender.bonuspointAppender.layout=org.apache.log4j.PatternLayout
> log4j.appender.bonuspointAppender.layout.ConversionPattern=%d [%t] %-5p
> (%C %M:%L) %x - %m%n
>
> #AMSSourceAppender
> log4j.appender.amssourceAppender=org.apache.log4j.RollingFileAppender
> log4j.appender.amssourceAppender.MaxFileSize=1024MB
> log4j.appender.amssourceAppender.MaxBackupIndex=10
> log4j.appender.amssourceAppender.Append=true
> log4j.appender.amssourceAppender.File=D:\\logs\\flink-
> bpuser-bonus-amssource.logs
> #log4j.appender.amssourceAppender.DatePattern='.'-MM-dd
> log4j.appender.amssourceAppender.layout=org.apache.log4j.PatternLayout
> log4j.appender.amssourceAppender.layout.ConversionPattern=%d [%t] %-5p
> (%C %M:%L) %x - %m%n
>
>
>
>
> --
> *Cheers *
>
> *Puneet Kinra*
>
> *Mobile:+918800167808 <+91%2088001%2067808> | Skype :
> puneet.ki...@customercentria.com *
>
> *e-mail :puneet.ki...@customercentria.com
> *
>
>
>


Error while reporting metrics - ConcorrentModificationException

2018-03-20 Thread PedroMrChaves
Hello,

I have the following error while trying to report metrics to influxdb using
the DropwizardReporter.

2018-03-20 13:51:00,288 WARN 
org.apache.flink.runtime.metrics.MetricRegistryImpl   - Error while
reporting metrics
java.util.ConcurrentModificationException
at
java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
at 
java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
at java.util.HashSet.(HashSet.java:120)
at
org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:65)
at
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:298)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:906)
at 
org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
at 
org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
at
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:35)
at
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:26)
at
org.apache.flink.dropwizard.metrics.FlinkGaugeWrapper.getValue(FlinkGaugeWrapper.java:36)
at
metrics_influxdb.measurements.MeasurementReporter.fromGauge(MeasurementReporter.java:163)
at
metrics_influxdb.measurements.MeasurementReporter.report(MeasurementReporter.java:55)
at
org.apache.flink.dropwizard.ScheduledDropwizardReporter.report(ScheduledDropwizardReporter.java:231)
at
org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:417)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Any ideas on what might be the problem?




-
Best Regards,
Pedro Chaves
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Metric Registry Warnings

2018-03-20 Thread PedroMrChaves
Hello,

I still have the same issue with Flink Version 1.4.2.

java.lang.IllegalArgumentException: A metric named
.taskmanager.6aa8d13575228d38ae4abdfb37fa229e.CDC.Source:
EVENTS.1.numRecordsIn already exists
at
com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:91)
at
org.apache.flink.dropwizard.ScheduledDropwizardReporter.notifyOfAddedMetric(ScheduledDropwizardReporter.java:131)
at
org.apache.flink.runtime.metrics.MetricRegistryImpl.register(MetricRegistryImpl.java:319)
at
org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:370)
at
org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:303)
at
org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:293)
at
org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup.(OperatorIOMetricGroup.java:40)
at
org.apache.flink.runtime.metrics.groups.OperatorMetricGroup.(OperatorMetricGroup.java:48)
at
org.apache.flink.runtime.metrics.groups.TaskMetricGroup.addOperator(TaskMetricGroup.java:146)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:182)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setup(AbstractUdfStreamOperator.java:82)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:136)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)



-
Best Regards,
Pedro Chaves
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Updating Job dependencies without restarting Flink

2018-03-20 Thread Fabian Hueske
Hi,

I'm quite sure that is not supported.
You'd have to take a savepoint and restart the application.
Depending on the sink system, you could start the new job before shutting
the old job down.

Best, Fabian

2018-03-20 10:31 GMT+01:00 Rohil Surana :

> Hi,
>
> We have a lot of jobs on Flink cluster that are using some common
> dependencies and I wanted to know if there is a way to place those
> dependencies in the flink lib folder so they will be available to the
> applications without restarting the flink cluster, so that next time a job
> is started latest versions of  those dependencies are available for use. We
> are using Flink 1.4.0
>
> Any help in this matter is appreciated.
>


Re: Restart hook and checkpoint

2018-03-20 Thread Fabian Hueske
Well, that's not that easy to do, because checkpoints must be coordinated
and triggered the JobManager.
Also, the checkpointing mechanism with flowing checkpoint barriers (to
ensure checkpoint consistency) won't work once a task failed because it
cannot continue processing and forward barriers. If the task failed with an
OOME, the whole JVM is gone anyway.
I don't think it is possible to take something like a consistent rescue
checkpoint in case of a failure.

I might be possible to checkpoint application state of non-failed tasks,
but this would result in data loss for the failed task and we would need to
weigh the use cases for such a feature are the implementation effort.
Maybe there are better ways to address such use cases.

Best, Fabian

2018-03-20 6:43 GMT+01:00 makeyang :

> currently there is only time based way to trigger a checkpoint. based on
> this
> discussion, I think flink need to introduce event based way to trigger
> checkpoint such as restart a task manager should be count as a event.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: unable to addSource to StreamExecutionEnvironment?

2018-03-20 Thread Fabian Hueske
Thanks for reporting back!

2018-03-20 10:42 GMT+01:00 James Yu :

> Just found out that IDE seems auto import wrong class.
> While "org.apache.flink.streaming.api.datastream.DataStream" is required,
> "org.apache.flink.streaming.api.scala.DataStream" was imported.
>
> This is a UTF-8 formatted mail
> ---
> James C.-C.Yu
> +886988713275 <+886%20988%20713%20275>
>
> 2018-03-20 16:55 GMT+08:00 James Yu :
>
>> Hi,
>>
>> I am following the Taxi example provided on "
>> http://training.data-artisans.com/exercises/taxiData.html;, however, I
>> got the following error message when I copy addSource line into my Intellij
>> IDE.
>>
>> error message -->
>> Incompatible types. Required DataStream but 'addSource' was
>> inferred to DataStreamSource: no instance(s) of type variable(s) OUT
>> exist so that DataStreamSource conforms to DataStream
>>
>> addSource line -->
>> DataStream rides = env.addSource(
>>   new TaxiRideSource("/path/to/nycTaxiRides.gz", maxDelay,
>> servingSpeed));
>>
>> My question would be: how do we addSource in latest Flink? I am
>> running flink-1.4.2.
>>
>>
>> This is a UTF-8 formatted mail
>> ---
>> James C.-C.Yu
>> +886988713275 <+886%20988%20713%20275>
>>
>
>


Re: Let BucketingSink roll file on each checkpoint

2018-03-20 Thread Fabian Hueske
Hi,

The BucketingSink closes files once they reached a certain size (BatchSize)
or have not been written to for a certain amount of time
(InactiveBucketThreshold).
While being written to, files are in an in-progress state and only moved to
to completed once being closed. When that happens, other systems can pick
up the file and process it.
Processing a non-closed file would cause many problems.

However, closing files on every checkpoint would likely result in many
small files which HDFS doesn't support so well.
You can of course take the BucketingSink code and adapt it to your use case.

Best, Fabian

2018-03-20 2:13 GMT+01:00 XilangYan :

> The behavior of BucketingSink is not exactly we want.
> If I understood correctly, when checkpoint requested, BucketingSink will
> flush writer to make sure data not loss, but will not close file, nor roll
> new file after checkpoint.
> In the case of HDFS, if file length is not updated to name node(through
> close file or update file length specifically), MR or other data analysis
> tool will not read new data. This is not we desired.
> I also want to open new file for each checkpoint period to make sure HDFS
> file is persistent, because we met some bugs in flush/append hdfs file user
> case.
>
> Is there anyway to let BucketingSink roll file on each checkpoint? Thanks
> in
> advance.
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Flink remote debug not working

2018-03-20 Thread Ankit Chaudhary
Hey Guys,

>From flink 1.4.+ onwards , I some how not able to use JVM args for remote
debug, i.e.,
"-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=".

I am using: env.java.opts:
"-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=" in
flink-conf.yaml. When I try to restart the cluster, the task managers are
not started back again.

In the out log of the task managers, I can see following error:

ERROR: transport error 202: bind failed: Address already in use
ERROR: JDWP Transport dt_socket failed to initialize, TRANSPORT_INIT(510)
JDWP exit error AGENT_ERROR_TRANSPORT_INIT(197): No transports initialized
[debugInit.c:750]

Now, when I try to find the process where the port is already used, I find
that the PID belongs to the job manager process. Is this intended behavior
or I am missing something?

Regards, Ankit


Re: Kafka ProducerFencedException after checkpointing

2018-03-20 Thread Dongwon Kim
Hi Piotr,

We have set producer's [transaction.timeout.ms] to 15 minutes and have used
the default setting for broker (15 mins).
As Flink's checkpoint interval is 15 minutes, it is not a situation where
Kafka's timeout is smaller than Flink's checkpoint interval.
As our first checkpoint just takes 2 minutes, it seems like transaction is
not committed properly.

Best,

- Dongwon





On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski 
wrote:

> Hi,
>
> What’s your Kafka’s transaction timeout setting? Please both check Kafka
> producer configuration (transaction.timeout.ms property) and Kafka broker
> configuration. The most likely cause of such error message is when Kafka's
> timeout is smaller then Flink’s checkpoint interval and transactions are
> not committed quickly enough before timeout occurring.
>
> Piotrek
>
> On 17 Mar 2018, at 07:24, Dongwon Kim  wrote:
>
>
> Hi,
>
> I'm faced with the following ProducerFencedException after 1st, 3rd, 5th,
> 7th, ... checkpoints:
>
> --
>
> java.lang.RuntimeException: Error while confirming checkpoint
>   at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer 
> attempted an operation with an old epoch. Either there is a newer producer 
> with the same transactionalId, or the producer's transaction has been expired 
> by the broker.
>
> --
>
>
> FYI, I'm using Flink 1.4.0 and testing end-to-end exactly once processing
> using Kafka sink.
> We use FsStateBackend to store snapshot data on HDFS.
>
> As shown in configuration.png, my checkpoint configuration is:
> - Checkpointing Mode : Exactly Once
> - Interval : 15m 0s
> - Timeout : 10m 0s
> - Minimum Pause Between Checkpoints : 5m 0s
> - Maximum Concurrent Checkpoints : 1
> - Persist Checkpoints Externally : Disabled
>
> After the first checkpoint completed [see history after 1st ckpt.png], the
> job is restarted due to the ProducerFencedException [see exception after
> 1st ckpt.png].
> The first checkpoint takes less than 2 minutes while my checkpoint
> interval is 15m and minimum pause between checkpoints is 5m.
> After the job is restarted, the second checkpoint is triggered after a
> while [see history after 2nd ckpt.png] and this time I've got no exception.
> The third checkpoint results in the same exception as after the first
> checkpoint.
>
> Can anybody let me know what's going wrong behind the scene?
>
> Best,
>
> Dongwon
>  2nd ckpt.png> ckpt.png>
>
>
>


Re: [ANNOUNCE] Apache Flink 1.3.3 released

2018-03-20 Thread Philip Luppens
Hi everyone,

Thanks, but I don’t see the binaries for 1.3.3 being pushed anywhere in the
Maven repositories [1]. Can we expect them to show up over there as well
eventually?

[1] https://repo.maven.apache.org/maven2/org/apache/flink/flink-java/

Kind regards,

-Phil


On Fri, Mar 16, 2018 at 3:36 PM, Stephan Ewen  wrote:

> This release fixed a quite critical bug that could lead to loss of
> checkpoint state: https://issues.apache.org/jira/browse/FLINK-7783
>
> We recommend all users on Flink 1.3.2 to upgrade to 1.3.3
>
>
> On Fri, Mar 16, 2018 at 10:31 AM, Till Rohrmann 
> wrote:
>
>> Thanks for managing the release Gordon and also thanks to the community!
>>
>> Cheers,
>> Till
>>
>> On Fri, Mar 16, 2018 at 9:05 AM, Fabian Hueske 
>> wrote:
>>
>>> Thanks for managing this release Gordon!
>>>
>>> Cheers, Fabian
>>>
>>> 2018-03-15 21:24 GMT+01:00 Tzu-Li (Gordon) Tai :
>>>
 The Apache Flink community is very happy to announce the release of
 Apache Flink 1.3.3, which is the third bugfix release for the Apache Flink
 1.3 series.

 Apache Flink® is an open-source stream processing framework for
 distributed, high-performing, always-available, and accurate data streaming
 applications.

 The release is available for download at:
 https://flink.apache.org/downloads.html

 Please check out the release blog post for an overview of the
 improvements for this bugfix release:
 https://flink.apache.org/news/2018/03/15/release-1.3.3.html

 The full release notes are available in Jira:
 https://issues.apache.org/jira/secure/ReleaseNote.jspa?proje
 ctId=12315522=12341142

 We would like to thank all contributors of the Apache Flink community
 who made this release possible!

 Cheers,
 Gordon


>>>
>>
>


-- 
"We cannot change the cards we are dealt, just how we play the hand." -
Randy Pausch


Re: unable to addSource to StreamExecutionEnvironment?

2018-03-20 Thread James Yu
Just found out that IDE seems auto import wrong class.
While "org.apache.flink.streaming.api.datastream.DataStream" is required,
"org.apache.flink.streaming.api.scala.DataStream" was imported.

This is a UTF-8 formatted mail
---
James C.-C.Yu
+886988713275

2018-03-20 16:55 GMT+08:00 James Yu :

> Hi,
>
> I am following the Taxi example provided on "http://training.data-
> artisans.com/exercises/taxiData.html", however, I got the following error
> message when I copy addSource line into my Intellij IDE.
>
> error message -->
> Incompatible types. Required DataStream but 'addSource' was
> inferred to DataStreamSource: no instance(s) of type variable(s) OUT
> exist so that DataStreamSource conforms to DataStream
>
> addSource line -->
> DataStream rides = env.addSource(
>   new TaxiRideSource("/path/to/nycTaxiRides.gz", maxDelay, servingSpeed));
>
> My question would be: how do we addSource in latest Flink? I am
> running flink-1.4.2.
>
>
> This is a UTF-8 formatted mail
> ---
> James C.-C.Yu
> +886988713275 <+886%20988%20713%20275>
>


Help Required for Log4J

2018-03-20 Thread Puneet Kinra
Hi

I have a use case in which i want to log bad records in the log file. I
have configured the log4j
property file is getting generated as well but it also going to flink logs
as well i want to detach
it from flink logs want to write to log file.

.Here is configuration
*(Note :AMSSource is the custom written adaptor here)*

# This affects logging for both user code and Flink
log4j.rootLogger=INFO, file
log4j.logger.amssource=DEBUG, amssourceAppender

# Uncomment this if you want to _only_ change Flink's logging
#log4j.logger.org.apache.flink=INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to
manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO

# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=D:\\logs\\flink-log
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS}
%-5p %-60c %x - %m%n

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,
file


#BonusPointAppender
log4j.appender.bonuspointAppender=org.apache.log4j.RollingFileAppender
log4j.appender.bonuspointAppender.MaxFileSize=1024MB
log4j.appender.bonuspointAppender.MaxBackupIndex=10
log4j.appender.bonuspointAppender.Append=true
log4j.appender.bonuspointAppender.File=D:\\logs\\flink-bpuser-bonus.logs
#log4j.appender.bonuspointAppender.DatePattern='.'-MM-dd
log4j.appender.bonuspointAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.bonuspointAppender.layout.ConversionPattern=%d [%t] %-5p (%C
%M:%L) %x - %m%n

#AMSSourceAppender
log4j.appender.amssourceAppender=org.apache.log4j.RollingFileAppender
log4j.appender.amssourceAppender.MaxFileSize=1024MB
log4j.appender.amssourceAppender.MaxBackupIndex=10
log4j.appender.amssourceAppender.Append=true
log4j.appender.amssourceAppender.File=D:\\logs\\flink-bpuser-bonus-
amssource.logs
#log4j.appender.amssourceAppender.DatePattern='.'-MM-dd
log4j.appender.amssourceAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.amssourceAppender.layout.ConversionPattern=%d [%t] %-5p (%C
%M:%L) %x - %m%n




-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
*

*e-mail :puneet.ki...@customercentria.com
*


Re: Kafka ProducerFencedException after checkpointing

2018-03-20 Thread Piotr Nowojski
Hi,

What’s your Kafka’s transaction timeout setting? Please both check Kafka 
producer configuration (transaction.timeout.ms property) and Kafka broker 
configuration. The most likely cause of such error message is when Kafka's 
timeout is smaller then Flink’s checkpoint interval and transactions are not 
committed quickly enough before timeout occurring.

Piotrek

> On 17 Mar 2018, at 07:24, Dongwon Kim  wrote:
> 
> 
> Hi,
> 
> I'm faced with the following ProducerFencedException after 1st, 3rd, 5th, 
> 7th, ... checkpoints:
> --
> java.lang.RuntimeException: Error while confirming checkpoint
>   at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer 
> attempted an operation with an old epoch. Either there is a newer producer 
> with the same transactionalId, or the producer's transaction has been expired 
> by the broker.
> --
> 
> FYI, I'm using Flink 1.4.0 and testing end-to-end exactly once processing 
> using Kafka sink.
> We use FsStateBackend to store snapshot data on HDFS.
> 
> As shown in configuration.png, my checkpoint configuration is:
> - Checkpointing Mode : Exactly Once
> - Interval : 15m 0s
> - Timeout : 10m 0s
> - Minimum Pause Between Checkpoints : 5m 0s
> - Maximum Concurrent Checkpoints : 1
> - Persist Checkpoints Externally : Disabled
> 
> After the first checkpoint completed [see history after 1st ckpt.png], the 
> job is restarted due to the ProducerFencedException [see exception after 1st 
> ckpt.png].
> The first checkpoint takes less than 2 minutes while my checkpoint interval 
> is 15m and minimum pause between checkpoints is 5m.
> After the job is restarted, the second checkpoint is triggered after a while 
> [see history after 2nd ckpt.png] and this time I've got no exception.
> The third checkpoint results in the same exception as after the first 
> checkpoint.
> 
> Can anybody let me know what's going wrong behind the scene?
> 
> Best,
> 
> Dongwon
>  ckpt.png> ckpt.png>



Re: Migration to Flip6 Kubernetes

2018-03-20 Thread Till Rohrmann
Hi Edward,

you're right that Flink's Kubernetes documentation has not been updated
with respect to Flip-6. This will be one of the tasks during the Flink 1.5
release testing and is still pending.

A Flink cluster can be run in two modes: session mode vs per-job mode. The
former starts a cluster to which you can submit multiple jobs. The cluster
shares the same ResourceManager and a Dispatcher which is responsible for
spawning JobMasters which execute a single job each. The latter starts a
Flink cluster which is pre-initialized with a JobGraph and only runs this
job. Here we also start a ResourceManager and a MiniDispatcher whose job it
is to simply start a single JobMaster with the pre-initialized JobGraph.

StandaloneSessionClusterEntrypoint is the entrypoint for the session mode.

The JobClusterEntrypoint is the entrypoint for the per-job mode. Take a
look at YarnJobClusterEntrypoint to see how the entrypoint retrieves the
JobGraph from HDFS and then automatically starts executing it. There is no
script which directly starts this entrypoint, but the YarnClusterDescriptor
uses it when `deployJobCluster` is called.

Depending on what you want to achieve: Either building generic K8 images to
which you can submit any number of Flink jobs or having a special image
which contains the single job you want to exeucte, you either have to call
into the SessionClusterEntrypoint or the JobClusterEntrypoint. When
starting a session cluster, then you can use bin/flink run to submit a job
to this cluster.

Let me know if you have other questions.

Cheers,
Till

On Thu, Mar 15, 2018 at 7:53 PM, Edward Rojas 
wrote:

> Hello,
>
> Currently I have a Flink 1.4 cluster running on kubernetes based on the
> configuration describe on
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/ops/deployment/kubernetes.html
> with additional config for HA with Zookeeper.
>
> With this I have several Taskmanagers, a single Jobmanager and I create a
> container for each job to perform the Job submission and manage Job updates
> with savepoints.
>
>
> I'm looking into what would be needed to migrate to the new architecture on
> FLIP6 as we are planning to use Flink 1.5 once it's ready.
>
> If I understand correctly from
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
> and the current code on master:
>
> * Taskmanagers would continue the same, i.e they will execute the
> taskmanager.sh start-foreground  script, which with the flip6 mode
> activated
> will execute the new taskexecutor.TaskManagerRunner.
>
> * We will have now one Job Manager per Job which is really good; but I
> don't
> fully understand how this would be started.
>
> I notice that the jobmanager.sh with flip6 mode activated will execute
> entrypoint.StandaloneSessionClusterEntrypoint but I don't see how we could
> pass the job jar and parameters (?)
>
> So I think the other possibility to start the job would be via the /flink
> run/ command with maybe an option to tell that we are creating a job with
> job manager or would be this the default behaviour ?
>
> Or would be this the role of the JobMaster ? I didn't take a look to its
> code but it's mentioned on the flip6 page. (however I don't see an
> entrypoint from the scripts (?))
>
> Could you help me to understand how this is expected to be done ?
>
>
> * Also I'm not sure to understand whether it would be better to have a
> ResourceManager per job or a single ResourceManager per cluster, as in the
> page is stated that there is a ResourceManager for
> Self-contained-single-job, but it seems to me that it needs to have the
> information about all JobManagers and TaskManagers (?)
>
>
> Thanks in advance for the help you could provide.
>
> I'm interested in using Flip6 on kubernetes when it will be ready, so I
> could help with some testing if needed.
>
> --
> Edward
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: pyflink not working

2018-03-20 Thread Chesnay Schepler

I've commented in the linked JIRA, let's move this discussion there.

On 20.03.2018 10:00, Ganesh Manal wrote:


Hi,

   Not able to execute the pyflink job using the pyflink script. 
Similar to already logged issue –


https://issues.apache.org/jira/browse/FLINK-8909 



My question would be: how we will be able to execute the pyflink job? 
I am running flink-1.4.0.


Thanks & Regards,
Ganesh Manal





RE: pyflink not working

2018-03-20 Thread Ganesh Manal
Forgot to mention, pyflink job is executing locally but not when executed with 
the yarn.
Same is mentioned in - https://issues.apache.org/jira/browse/FLINK-8909

Thanks & Regards,
Ganesh Manal

From: Ganesh Manal
Sent: Tuesday, March 20, 2018 2:31 PM
To: user@flink.apache.org 
Subject: pyflink not working

Hi,

   Not able to execute the pyflink job using the pyflink script. Similar to 
already logged issue -
   https://issues.apache.org/jira/browse/FLINK-8909

   My question would be: how we will be able to execute the pyflink job? I am 
running flink-1.4.0.

Thanks & Regards,
Ganesh Manal



pyflink not working

2018-03-20 Thread Ganesh Manal
Hi,

   Not able to execute the pyflink job using the pyflink script. Similar to 
already logged issue –
   https://issues.apache.org/jira/browse/FLINK-8909

   My question would be: how we will be able to execute the pyflink job? I am 
running flink-1.4.0.

Thanks & Regards,
Ganesh Manal



unable to addSource to StreamExecutionEnvironment?

2018-03-20 Thread James Yu
Hi,

I am following the Taxi example provided on "
http://training.data-artisans.com/exercises/taxiData.html;, however, I got
the following error message when I copy addSource line into my Intellij IDE.

error message -->
Incompatible types. Required DataStream but 'addSource' was
inferred to DataStreamSource: no instance(s) of type variable(s) OUT
exist so that DataStreamSource conforms to DataStream

addSource line -->
DataStream rides = env.addSource(
  new TaxiRideSource("/path/to/nycTaxiRides.gz", maxDelay, servingSpeed));

My question would be: how do we addSource in latest Flink? I am
running flink-1.4.2.


This is a UTF-8 formatted mail
---
James C.-C.Yu
+886988713275