[jira] [Created] (FLINK-10195) RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly

2018-08-21 Thread Luka Jurukovski (JIRA)
Luka Jurukovski created FLINK-10195:
---

 Summary: RabbitMQ Source With Checkpointing Doesn't Backpressure 
Correctly
 Key: FLINK-10195
 URL: https://issues.apache.org/jira/browse/FLINK-10195
 Project: Flink
  Issue Type: Bug
  Components: RabbitMQ Connector
Affects Versions: 1.6.0, 1.5.1, 1.5.0, 1.4.0
Reporter: Luka Jurukovski


The connection between the RabbitMQ server and the client does not 
appropriately back pressure when auto acking is disabled. This becomes very 
problematic when a downstream process throttles the data processing to slower 
then RabbitMQ sends the data to the client.

The difference in records ends up being stored in the flink's heap space, which 
grows indefinitely (or technically to "Integer Max" Deliveries). Looking at 
RabbitMQ's metrics the number of unacked messages looks like steadily rising 
saw tooth shape.

Upon further invesitgation it looks like this is due to how the 
QueueingConsumer works, messages are added to the BlockingQueue faster then 
they are being removed and processed, resulting in the previously described 
behavior.

This may be intended behavior, however this isn't explicitly obvious in the 
documentation or any of the examples I have seen.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10194) Serialization issue with Scala's AggregateDataSet[Row]

2018-08-21 Thread Alexis Sarda-Espinosa (JIRA)
Alexis Sarda-Espinosa created FLINK-10194:
-

 Summary: Serialization issue with Scala's AggregateDataSet[Row]
 Key: FLINK-10194
 URL: https://issues.apache.org/jira/browse/FLINK-10194
 Project: Flink
  Issue Type: Bug
 Environment: Flink v1.6.0
Reporter: Alexis Sarda-Espinosa


 

Consider the following code, where I had to jump through some hoops to manually 
create a DataSet[Row] that allows using groupBy and sum as shown:
{code:java}
object Main {
  def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment

val letters = Seq("a", "a", "b").map(Row.of(_, 1.asInstanceOf[Object]))
val typeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
  BasicTypeInfo.INT_TYPE_INFO)

import scala.collection.JavaConverters._
val inputFormat = new CollectionInputFormat(letters.asJavaCollection,
  typeInfo.createSerializer(env.getConfig))

val source = new DataSource(env.getJavaEnv,
  inputFormat,
  typeInfo,
  "hello.flink.Main$.main(Main.scala:20")

val dataSet = new DataSet(source)

dataSet.print()

dataSet
  .groupBy(0)
  .sum(1)
  .print()
  }
}{code}
The call to dataSet.print() works as expected, but the final print() throws an 
exception:
{noformat}
Caused by: java.lang.ClassCastException: 
org.apache.flink.api.java.typeutils.runtime.RowSerializer cannot be cast to 
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase
at 
org.apache.flink.api.scala.operators.ScalaAggregateOperator$AggregatingUdf.open(ScalaAggregateOperator.java:260){noformat}
Changing the final print() to collect() throws the same exception.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10193) Default RPC timeout is used when triggering savepoint via JobMasterGateway

2018-08-21 Thread Gary Yao (JIRA)
Gary Yao created FLINK-10193:


 Summary: Default RPC timeout is used when triggering savepoint via 
JobMasterGateway
 Key: FLINK-10193
 URL: https://issues.apache.org/jira/browse/FLINK-10193
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination
Affects Versions: 1.6.0, 1.5.3
Reporter: Gary Yao
Assignee: Gary Yao


When calling {{JobMasterGateway#triggerSavepoint(String, boolean, Time)}}, the 
default timeout is used because the time parameter of the method  is not 
annotated with {{@RpcTimeout}}. 

*Expected behavior*
* timeout for the RPC should be {{RpcUtils.INF_TIMEOUT}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Side effect of DataStreamRel#translateToPlan

2018-08-21 Thread Timo Walther

Hi Wangsan,

the bahavior of DataStreamRel#translateToPlan is more or less intended. 
That's why you call `toAppendStream` on the table environment. Because 
you add your pipeline to the environment (from source to current operator).


However, the explain() method should not cause those side-effects.

Regards,
Timo

Am 21.08.18 um 17:29 schrieb wangsan:

Hi Timo,

I think this may not only affect  explain() method. Method 
DataStreamRel#translateToPlan is called when we need translate a FlinkRelNode 
into DataStream or DataSet, we add desired operators in execution environment. 
By side effect, I mean that if we call DataStreamRel#translateToPlan on same 
RelNode  several times, the same operators are added in execution environment 
more than once, but actually we need that for only one time. Correct me if I 
misunderstood that.

I will open an issue late this day, if this is indeed a problem.

Best,
wangsan




On Aug 21, 2018, at 10:16 PM, Timo Walther  wrote:

Hi,

this sounds like a bug to me. Maybe the explain() method is not implemented 
correctly. Can you open an issue for it in Jira?

Thanks,
Timo


Am 21.08.18 um 15:04 schrieb wangsan:

Hi all,

I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that 
may cause the execution plan not as what we expected. Every time we call 
DataStreamRel#translateToPlan (in TableEnvirnment#explain, 
TableEnvirnment#writeToSink, etc), we add same operators in execution 
environment repeatedly.

Should we eliminate the side effect of DataStreamRel#translateToPlan ?

Best,  Wangsan

appendix

 tenv.registerTableSource("test_source", sourceTable)

 val t = tenv.sqlQuery("SELECT * from test_source")
 println(tenv.explain(t))
 println(tenv.explain(t))

 implicit val typeInfo = TypeInformation.of(classOf[Row])
 tenv.toAppendStream(t)
 println(tenv.explain(t))
We call explain three times, and the Physical Execution Plan are all diffrent.

== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
   LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
 content : collect elements with CollectionInputFormat

 Stage 2 : Operator
 content : CsvTableSource(read fields: f1, f2)
 ship_strategy : FORWARD

 Stage 3 : Operator
 content : Map
 ship_strategy : FORWARD


== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
   LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
 content : collect elements with CollectionInputFormat

 Stage 2 : Operator
 content : CsvTableSource(read fields: f1, f2)
 ship_strategy : FORWARD

 Stage 3 : Operator
 content : Map
 ship_strategy : FORWARD

Stage 4 : Data Source
 content : collect elements with CollectionInputFormat

 Stage 5 : Operator
 content : CsvTableSource(read fields: f1, f2)
 ship_strategy : FORWARD

 Stage 6 : Operator
 content : Map
 ship_strategy : FORWARD


== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
   LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
 content : collect elements with CollectionInputFormat

 Stage 2 : Operator
 content : CsvTableSource(read fields: f1, f2)
 ship_strategy : FORWARD

 Stage 3 : Operator
 content : Map
 ship_strategy : FORWARD

Stage 4 : Data Source
 content : collect elements with CollectionInputFormat

 Stage 5 : Operator
 content : CsvTableSource(read fields: f1, f2)
 ship_strategy : FORWARD

 Stage 6 : Operator
 content : Map
 ship_strategy : FORWARD

Stage 7 : Data Source
 content : collect elements with CollectionInputFormat

 Stage 8 : Operator
 content : CsvTableSource(read fields: f1, f2)
 ship_strategy : FORWARD

 Stage 9 : Operator
 content : Map
 ship_strategy : FORWARD

 Stage 10 : Operator
 content : to: Row
 ship_strategy : FORWARD

Stage 11 : Data Source
 content : collect elements with CollectionInputFormat

 Stage 12 : Operator
 content : CsvTableSource(read fields: f1, f2)
 ship_strategy : FORWARD

 Stage 13 : Operator
 content : Map
 ship_strategy : FORWARD








Re: Side effect of DataStreamRel#translateToPlan

2018-08-21 Thread wangsan
Hi Timo, 

I think this may not only affect  explain() method. Method 
DataStreamRel#translateToPlan is called when we need translate a FlinkRelNode 
into DataStream or DataSet, we add desired operators in execution environment. 
By side effect, I mean that if we call DataStreamRel#translateToPlan on same 
RelNode  several times, the same operators are added in execution environment 
more than once, but actually we need that for only one time. Correct me if I 
misunderstood that.

I will open an issue late this day, if this is indeed a problem.

Best,
wangsan



> On Aug 21, 2018, at 10:16 PM, Timo Walther  wrote:
> 
> Hi,
> 
> this sounds like a bug to me. Maybe the explain() method is not implemented 
> correctly. Can you open an issue for it in Jira?
> 
> Thanks,
> Timo
> 
> 
> Am 21.08.18 um 15:04 schrieb wangsan:
>> Hi all,
>> 
>> I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that 
>> may cause the execution plan not as what we expected. Every time we call 
>> DataStreamRel#translateToPlan (in TableEnvirnment#explain, 
>> TableEnvirnment#writeToSink, etc), we add same operators in execution 
>> environment repeatedly.
>> 
>> Should we eliminate the side effect of DataStreamRel#translateToPlan ?
>> 
>> Best,  Wangsan
>> 
>> appendix
>> 
>> tenv.registerTableSource("test_source", sourceTable)
>> 
>> val t = tenv.sqlQuery("SELECT * from test_source")
>> println(tenv.explain(t))
>> println(tenv.explain(t))
>> 
>> implicit val typeInfo = TypeInformation.of(classOf[Row])
>> tenv.toAppendStream(t)
>> println(tenv.explain(t))
>> We call explain three times, and the Physical Execution Plan are all 
>> diffrent.
>> 
>> == Abstract Syntax Tree ==
>> LogicalProject(f1=[$0], f2=[$1])
>>   LogicalTableScan(table=[[test_source]])
>> 
>> == Optimized Logical Plan ==
>> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
>> source=[CsvTableSource(read fields: f1, f2)])
>> 
>> == Physical Execution Plan ==
>> Stage 1 : Data Source
>> content : collect elements with CollectionInputFormat
>> 
>> Stage 2 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>> 
>> Stage 3 : Operator
>> content : Map
>> ship_strategy : FORWARD
>> 
>> 
>> == Abstract Syntax Tree ==
>> LogicalProject(f1=[$0], f2=[$1])
>>   LogicalTableScan(table=[[test_source]])
>> 
>> == Optimized Logical Plan ==
>> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
>> source=[CsvTableSource(read fields: f1, f2)])
>> 
>> == Physical Execution Plan ==
>> Stage 1 : Data Source
>> content : collect elements with CollectionInputFormat
>> 
>> Stage 2 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>> 
>> Stage 3 : Operator
>> content : Map
>> ship_strategy : FORWARD
>> 
>> Stage 4 : Data Source
>> content : collect elements with CollectionInputFormat
>> 
>> Stage 5 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>> 
>> Stage 6 : Operator
>> content : Map
>> ship_strategy : FORWARD
>> 
>> 
>> == Abstract Syntax Tree ==
>> LogicalProject(f1=[$0], f2=[$1])
>>   LogicalTableScan(table=[[test_source]])
>> 
>> == Optimized Logical Plan ==
>> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
>> source=[CsvTableSource(read fields: f1, f2)])
>> 
>> == Physical Execution Plan ==
>> Stage 1 : Data Source
>> content : collect elements with CollectionInputFormat
>> 
>> Stage 2 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>> 
>> Stage 3 : Operator
>> content : Map
>> ship_strategy : FORWARD
>> 
>> Stage 4 : Data Source
>> content : collect elements with CollectionInputFormat
>> 
>> Stage 5 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>> 
>> Stage 6 : Operator
>> content : Map
>> ship_strategy : FORWARD
>> 
>> Stage 7 : Data Source
>> content : collect elements with CollectionInputFormat
>> 
>> Stage 8 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>> 
>> Stage 9 : Operator
>> content : Map
>> ship_strategy : FORWARD
>> 
>> Stage 10 : Operator
>> content : to: Row
>> ship_strategy : FORWARD
>> 
>> Stage 11 : Data Source
>> content : collect elements with CollectionInputFormat
>> 
>> Stage 12 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>> 
>> Stage 13 : Operator
>> content : Map
>> ship_strategy : FORWARD
>> 
>> 



[jira] [Created] (FLINK-10192) SQL Client table visualization mode does not update correctly

2018-08-21 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-10192:
-

 Summary: SQL Client table visualization mode does not update 
correctly
 Key: FLINK-10192
 URL: https://issues.apache.org/jira/browse/FLINK-10192
 Project: Flink
  Issue Type: Bug
  Components: Table API  SQL
Affects Versions: 1.6.0
Reporter: Fabian Hueske


The table visualization modes does not seem to update correctly.
When I run a query that groups and aggregates on a few (6) distinct keys, the 
client visualizes some keys multiple times. Also the aggregated values do not 
seem to be correct.
Due to the small number of keys, these get frequently updated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10191) WindowCheckpointingITCase.testAggregatingSlidingProcessingTimeWindow

2018-08-21 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-10191:
-

 Summary: 
WindowCheckpointingITCase.testAggregatingSlidingProcessingTimeWindow
 Key: FLINK-10191
 URL: https://issues.apache.org/jira/browse/FLINK-10191
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.7.0
Reporter: Till Rohrmann
 Fix For: 1.7.0


{{WindowCheckpointingITCase.testAggregatingSlidingProcessingTimeWindow}} failed 
on Travis.

https://api.travis-ci.org/v3/job/418629694/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS][TABLE] How to handle empty delete for UpsertSource

2018-08-21 Thread Xingcan Cui
Hi Hequn,

Thanks for this discussion.

Personally, I’m also in favor of option 3. There are two reasons for that:

(1) A proctime-based upsert table source does not guarantee the records’ order, 
which means empty delete messages may not really be "empty". Simply discarding 
them may cause semantics problems.
(2) Materializing the table in the source doesn't sound like an efficient 
solution, especially considering the downstream operators may also need to 
materialize the immediate tables many times.

Therefore, why not choosing a "lazy strategy", i.e., just forward the messages 
and let the operators that are sensitive with empty delete to tackle them.

As for the filtering problem, maybe the best approach would be to cache all the 
keys that meet the criteria and send a retract message when it changes.

BTW, recently, I’m getting a more and more intense feeling that maybe we should 
merge the retract message and upsert message into a unified “update message”. 
(Append Stream VS Update Stream).

Best,
Xingcan

> On Aug 20, 2018, at 7:51 PM, Piotr Nowojski  wrote:
> 
> Hi,
> 
> Thanks for bringing up this issue here.
> 
> I’m not sure whether sometimes swallowing empty deletes could be a problem or 
> always swallowing/forwarding them is better. I guess for most use cases it 
> doesn't matter. Maybe the best for now would be to always forward them, since 
> if they are a problem, user could handle them somehow, either in custom sink 
> wrapper or in system that’s downstream from Flink. Also maybe we could have 
> this configurable in the future.
> 
> However this thing seems to me like a much lower priority compared to 
> performance implications. Forcing upsert source to always keep all of the 
> keys on the state is not only costly, but in many cases it can be a blocker 
> from executing a query at all. Not only for the UpsertSource -> Calc -> 
> UpsertSink, but also for example in the future for joins or ORDER BY 
> (especially with LIMIT) as well.
> 
> I would apply same reasoning to FLINK-9528.
> 
> Piotrek
> 
>> On 19 Aug 2018, at 08:21, Hequn Cheng  wrote:
>> 
>> Hi all,
>> 
>> Currently, I am working on FLINK-8577 Implement proctime DataStream to
>> Table upsert conversion .
>> And a design doc can be found here
>> .
>> It received many valuable suggestions. Many thanks to all of you.
>> However there are some problems I think may need more discussion.
>> 
>> *Terms*
>> 
>>  1. *Upsert Stream:* Stream that include a key definition and will be
>>  updated. Message types include insert, update and delete.
>>  2. *Upsert Source:* Source that ingest Upsert Stream.
>>  3. *Empty Delete:* For a specific key, the first message is a delete
>>  message.
>> 
>> *Problem to be discussed*
>> How to handle empty deletes for UpsertSource?
>> 
>> *Ways to solve the problem*
>> 
>>  1. Throw away empty delete messages in the UpsertSource(personally in
>>  favor of this option)
>> - advantages
>> - This makes sense in semantics. An empty table + delete message is
>>still an empty table. Losing deletes does not affect the final 
>> results.
>>- At present, the operators or functions in flink are assumed to
>>process the add message first and then delete. Throw away
>> empty deletes in
>>source, so that the downstream operators do not need to
>> consider the empty
>>deletes.
>>- disadvantages
>> - Maintaining the state in source is expensive, especially for some
>>simple sql like: UpsertSource -> Calc -> UpsertSink.
>>2. Throw away empty delete messages when source generate
>>  retractions, otherwise pass empty delete messages down
>> - advantages
>> - Downstream operator does not need to consider empty delete messages
>>when the source generates retraction.
>>- Performance is better since source don't have to maintain state
>>if it doesn't generate retractions.
>>- disadvantages
>> - The judgment that whether the downstream operator will receive
>>empty delete messages is complicated. Not only take source into
>>consideration, but also should consider the operators that
>> are followed by
>>source. Take join as an example, for the sql: upsert_source
>> -> upsert_join,
>>the join receives empty deletes while in sql(upsert_source ->
>> group_by ->
>>upsert_join), the join doesn't since empty deletes are ingested by
>>group_by.
>>- The semantics of how to process empty deletes are not clear.
>>Users may be difficult to understand, because sometimes empty
>> deletes are
>>passed down, but sometimes don't.
>>3. Pass empty delete messages down always
>> - advantages
>> - Performance is better since source don't have to maintain state if
>>it doesn't generate 

Re: Side effect of DataStreamRel#translateToPlan

2018-08-21 Thread Timo Walther

Hi,

this sounds like a bug to me. Maybe the explain() method is not 
implemented correctly. Can you open an issue for it in Jira?


Thanks,
Timo


Am 21.08.18 um 15:04 schrieb wangsan:

Hi all,

I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that 
may cause the execution plan not as what we expected. Every time we call 
DataStreamRel#translateToPlan (in TableEnvirnment#explain, 
TableEnvirnment#writeToSink, etc), we add same operators in execution 
environment repeatedly.

Should we eliminate the side effect of DataStreamRel#translateToPlan ?

Best,  Wangsan

appendix

 tenv.registerTableSource("test_source", sourceTable)

 val t = tenv.sqlQuery("SELECT * from test_source")
 println(tenv.explain(t))
 println(tenv.explain(t))

 implicit val typeInfo = TypeInformation.of(classOf[Row])
 tenv.toAppendStream(t)
 println(tenv.explain(t))
We call explain three times, and the Physical Execution Plan are all diffrent.

== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
   LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
 content : collect elements with CollectionInputFormat

 Stage 2 : Operator
 content : CsvTableSource(read fields: f1, f2)
 ship_strategy : FORWARD

 Stage 3 : Operator
 content : Map
 ship_strategy : FORWARD


== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
   LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
 content : collect elements with CollectionInputFormat

 Stage 2 : Operator
 content : CsvTableSource(read fields: f1, f2)
 ship_strategy : FORWARD

 Stage 3 : Operator
 content : Map
 ship_strategy : FORWARD

Stage 4 : Data Source
 content : collect elements with CollectionInputFormat

 Stage 5 : Operator
 content : CsvTableSource(read fields: f1, f2)
 ship_strategy : FORWARD

 Stage 6 : Operator
 content : Map
 ship_strategy : FORWARD


== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
   LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
 content : collect elements with CollectionInputFormat

 Stage 2 : Operator
 content : CsvTableSource(read fields: f1, f2)
 ship_strategy : FORWARD

 Stage 3 : Operator
 content : Map
 ship_strategy : FORWARD

Stage 4 : Data Source
 content : collect elements with CollectionInputFormat

 Stage 5 : Operator
 content : CsvTableSource(read fields: f1, f2)
 ship_strategy : FORWARD

 Stage 6 : Operator
 content : Map
 ship_strategy : FORWARD

Stage 7 : Data Source
 content : collect elements with CollectionInputFormat

 Stage 8 : Operator
 content : CsvTableSource(read fields: f1, f2)
 ship_strategy : FORWARD

 Stage 9 : Operator
 content : Map
 ship_strategy : FORWARD

 Stage 10 : Operator
 content : to: Row
 ship_strategy : FORWARD

Stage 11 : Data Source
 content : collect elements with CollectionInputFormat

 Stage 12 : Operator
 content : CsvTableSource(read fields: f1, f2)
 ship_strategy : FORWARD

 Stage 13 : Operator
 content : Map
 ship_strategy : FORWARD






Re: [ANNOUNCE] Apache Flink 1.5.3 released

2018-08-21 Thread Till Rohrmann
Great news. Thanks a lot for managing the release Chesnay and to all who
have contributed to this release.

Cheers,
Till

On Tue, Aug 21, 2018 at 2:12 PM Chesnay Schepler  wrote:

> |The Apache Flink community is very happy to announce the release of
> Apache Flink 1.5.3, which is the third bugfix release for the Apache
> Flink 1.5 series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data
> streaming applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the
> improvements for this bugfix release:
> https://flink.apache.org/news/2018/08/21/release-1.5.3.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12343777
>
> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
>
> Regards,
> Chesnay|
>


Re: Support Hadoop 2.6 for StreamingFileSink

2018-08-21 Thread Artsem Semianenka
Thanks Kostas for reply,

But till there are distributions like Cloudera which latest version (5.15)
based on Hadoop 2.6
I and many other Cloudera users obliged to use an older HDFS version.
Moreover I read discussion
on Cloudera forum regarding moving to more fresh version of Hadoop, and
Cloudera guys said
that they are not going to do that because they concentrate on 6th version
based on Hadoop 3.x .
In this case I doubt that Flink ready to work with latest Hadoop 3.x
version.
And as the result my company as Cloudera user in the trap. We place a bet
on Flink but can't use it
with our environment .

I will think about you idea of RecoverableStream without truncate for Bulk
encoders. But to tell the truth
currently I have no idea how to implement it . Because idiomatically
RecoverableWriter should be able
recover form specified pointer. In our case for Parquet BulkFormat we don't
need to recover we should
recreate hole file with checkpointed state. It not looks like
RecoverableWriter.

Cheers,
Artsem


On Tue, 21 Aug 2018 at 16:09, Kostas Kloudas 
wrote:

> Hi Artsem,
>
> Till is correct in that getting rid of the “valid-length” file was a
> design decision
> for the new StreamingFileSink since the beginning. The motivation was that
> users were reporting that essentially it was very cumbersome to use.
>
> In general, when the BucketingSink gets deprecated, I could see a benefit
> in having a
> legacy recoverable stream just in case you are obliged to use an older
> HDFS version.
> But, at least for now, this would be useful only for row-wise encoders,
> and NOT for
> bulk-encoders like Parquet.
>
> The reason is that for now, when using bulk encoders you roll on every
> checkpoint.
> This implies that you do not need truncate, or the valid length file.
> Given this,
> you may only need to write a Recoverable stream that just does not
> truncate.
>
> Would you like to try it out and see if it works for your usecase?
>
> Cheers,
> Kostas
>
> On Aug 21, 2018, at 1:58 PM, Artsem Semianenka 
> wrote:
>
> Thanks for reply, Till !
>
> Buy the way, If Flink going to support compatibility with Hadoop 2.6 I
> don't see another way how to achieve it.
> As I mention before one of popular distributive Cloudera still based on
> Hadoop 2.6 and it very sad if Flink unsupport it.
> I really want to help Flink comunity to support this legacy. But currently
> I see only one way to acheve it by emulate 'truncate' logic and recreate
> new file with needed lenght and replace old .
>
> Cheers,
> Artsem
>
> On Tue, 21 Aug 2018 at 14:41, Till Rohrmann  wrote:
>
>> Hi Artsem,
>>
>> if I recall correctly, then we explicitly decided to not support the valid
>> file length files with the new StreamingFileSink because they are really
>> hard to handle for the user. I've pulled Klou into this conversation who
>> is
>> more knowledgeable and can give you a bit more advice.
>>
>> Cheers,
>> Till
>>
>> On Mon, Aug 20, 2018 at 2:53 PM Artsem Semianenka > >
>> wrote:
>>
>> > I have an idea to create new version of
>> HadoopRecoverableFsDataOutputStream
>> > class (for example with name LegacyHadoopRecoverableFsDataOutputStream
>> :) )
>> > which will works with valid-length files without invoking truncate. And
>> > modify check in HadoopRecoverableWriter to use
>> > LegacyHadoopRecoverableFsDataOutputStream in case if Hadoop version is
>> > lower then 2.7 . I will try to provide PR soon if no objections. I hope
>> I
>> > am on the right way.
>> >
>> > On Mon, 20 Aug 2018 at 14:40, Artsem Semianenka > >
>> > wrote:
>> >
>> > > Hi guys !
>> > > I have a question regarding new StreamingFileSink (introduced in 1.6
>> > > version) . We use this sink to write data into Parquet format. But I
>> > faced
>> > > with issue when trying to run job on Yarn cluster and save result to
>> > HDFS.
>> > > In our case we use latest Cloudera distributive (CHD 5.15) and it
>> > contains
>> > > HDFS 2.6.0  . This version is not support truncate method . I would
>> like
>> > to
>> > > create Pull request but I want to ask your advice how better design
>> this
>> > > fix and which ideas are behind this decision . I saw similiar PR for
>> > > BucketingSink https://github.com/apache/flink/pull/6108 . Maybe I
>> could
>> > > also add support of valid-length files for older Hadoop versions ?
>> > >
>> > > P.S.Unfortently CHD 5.15 (with Hadoop 2.6) is the latest version of
>> > > Cloudera distributive and we can't upgrade hadoop to 2.7 Hadoop .
>> > >
>> > > Best regards,
>> > > Artsem
>> > >
>> >
>> >
>> > --
>> >
>> > С уважением,
>> > Артем Семененко
>> >
>>
>
>
> --
>
> С уважением,
> Артем Семененко
>
>
>

-- 

С уважением,
Артем Семененко


[jira] [Created] (FLINK-10190) Unable to use custom endpoint in Kinesis producer

2018-08-21 Thread Sergei Poganshev (JIRA)
Sergei Poganshev created FLINK-10190:


 Summary: Unable to use custom endpoint in Kinesis producer
 Key: FLINK-10190
 URL: https://issues.apache.org/jira/browse/FLINK-10190
 Project: Flink
  Issue Type: Bug
  Components: Kinesis Connector
Affects Versions: 1.6.0
Reporter: Sergei Poganshev


There's a check in 
[KinesisConfigUtil|https://github.com/apache/flink/blob/7d034d4ef6986ba5ccda6f5e8c587b8fdd88be8e/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java#L269]
 that validates the fact that either AWS_REGION or AWS_ENDPOINT is specified 
(not both), while Kinesis producer requires a region in any case (even with 
custom endpoint).

Also the error message for that validation outputs AWS_REGION twice.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS][TABLE] How to handle empty delete for UpsertSource

2018-08-21 Thread Fabian Hueske
Hi,

Thanks fort starting this discussion Hequn!
These are a tricky questions.

1) Handling empty deletes in UpsertSource.
I think forwarding empty deletes would only have a semantic difference if
the output is persisted in a non-empty external table, e.g., a Cassandra
table with entries.
If we would forward, the delete, we might remove data from the sink table.
This could be a desired effect.
Therefore, I think we should be able to forward empty deleted and filtering
them out could be a configurable option.

2) Handling upsert messages in Filters.
It think the problem is a bit better described with the following query
which writes to an upsert sink:

SELECT user, count(*) FROM clicks GROUP BY user HAVING count(*) < 10

As long as the count for a user is smaller than 10, an update is passed to
the upsert sink.
When the count reaches 10, the Filter would need to convert the update
message into a delete message (right now, the filter just removes the
message completely).
This would happen for every update of the count, i.e., the upsert sink
would need to handle many delete messages for data that is not longer in
the external storage (repeated deleted).
There are multiple ways to handle this issue:
* Make the Filter stateful and only convert the first message into a delete
and filter all subsequent updates. (Could also be a best effort cache,
LRU...)
* Make the UpsertSink opterator (optionally) stateful and track all deleted
entries. (Could also be a best effort cache, LRU...)
* For the (common?) special case of Aggregation -> Filter, we could offer a
dedicated operator that applies the filter within the aggregation.

---

So, we are dealing with two cases here:
1) Deleting what has not been ingested yet (although the result might be in
the external sink).
I would forward deletes and filter them optionally, i.e., approach 3 by
default and having approach 1 as an option

2) Deleting what has been deleted already.
I think having a best-effort cache in the Filter might be the best
approach. The GROUP-BY-HAVING operator might be a nice addition.
IMO, we should not give guarantees that an UpsertSink won't receive
repeated deletes.
If this is a problem for certain sink system, we could give an option for
an exact filter based on state.

What do you think?

Best, Fabian




2018-08-20 13:51 GMT+02:00 Piotr Nowojski :

> Hi,
>
> Thanks for bringing up this issue here.
>
> I’m not sure whether sometimes swallowing empty deletes could be a problem
> or always swallowing/forwarding them is better. I guess for most use cases
> it doesn't matter. Maybe the best for now would be to always forward them,
> since if they are a problem, user could handle them somehow, either in
> custom sink wrapper or in system that’s downstream from Flink. Also maybe
> we could have this configurable in the future.
>
> However this thing seems to me like a much lower priority compared to
> performance implications. Forcing upsert source to always keep all of the
> keys on the state is not only costly, but in many cases it can be a blocker
> from executing a query at all. Not only for the UpsertSource -> Calc ->
> UpsertSink, but also for example in the future for joins or ORDER BY
> (especially with LIMIT) as well.
>
> I would apply same reasoning to FLINK-9528.
>
> Piotrek
>
> > On 19 Aug 2018, at 08:21, Hequn Cheng  wrote:
> >
> > Hi all,
> >
> > Currently, I am working on FLINK-8577 Implement proctime DataStream to
> > Table upsert conversion  jira/browse/FLINK-8577>.
> > And a design doc can be found here
> >  aWe0y7Xqd0c1zE/edit?usp=sharing>.
> > It received many valuable suggestions. Many thanks to all of you.
> > However there are some problems I think may need more discussion.
> >
> > *Terms*
> >
> >   1. *Upsert Stream:* Stream that include a key definition and will be
> >   updated. Message types include insert, update and delete.
> >   2. *Upsert Source:* Source that ingest Upsert Stream.
> >   3. *Empty Delete:* For a specific key, the first message is a delete
> >   message.
> >
> > *Problem to be discussed*
> > How to handle empty deletes for UpsertSource?
> >
> > *Ways to solve the problem*
> >
> >   1. Throw away empty delete messages in the UpsertSource(personally in
> >   favor of this option)
> >  - advantages
> >  - This makes sense in semantics. An empty table + delete message is
> > still an empty table. Losing deletes does not affect the final
> results.
> > - At present, the operators or functions in flink are assumed to
> > process the add message first and then delete. Throw away
> > empty deletes in
> > source, so that the downstream operators do not need to
> > consider the empty
> > deletes.
> > - disadvantages
> >  - Maintaining the state in source is expensive, especially for some
> > simple sql like: UpsertSource -> Calc -> UpsertSink.
> > 

Re: Support Hadoop 2.6 for StreamingFileSink

2018-08-21 Thread Kostas Kloudas
Hi Artsem,

Till is correct in that getting rid of the “valid-length” file was a design 
decision 
for the new StreamingFileSink since the beginning. The motivation was that 
users were reporting that essentially it was very cumbersome to use.

In general, when the BucketingSink gets deprecated, I could see a benefit in 
having a 
legacy recoverable stream just in case you are obliged to use an older HDFS 
version. 
But, at least for now, this would be useful only for row-wise encoders, and NOT 
for 
bulk-encoders like Parquet.

The reason is that for now, when using bulk encoders you roll on every 
checkpoint.
This implies that you do not need truncate, or the valid length file. Given 
this, 
you may only need to write a Recoverable stream that just does not truncate.

Would you like to try it out and see if it works for your usecase?

Cheers,
Kostas

> On Aug 21, 2018, at 1:58 PM, Artsem Semianenka  wrote:
> 
> Thanks for reply, Till !
> 
> Buy the way, If Flink going to support compatibility with Hadoop 2.6 I don't 
> see another way how to achieve it. 
> As I mention before one of popular distributive Cloudera still based on 
> Hadoop 2.6 and it very sad if Flink unsupport it.
> I really want to help Flink comunity to support this legacy. But currently I 
> see only one way to acheve it by emulate 'truncate' logic and recreate new 
> file with needed lenght and replace old .
> 
> Cheers,
> Artsem
> 
> On Tue, 21 Aug 2018 at 14:41, Till Rohrmann  > wrote:
> Hi Artsem,
> 
> if I recall correctly, then we explicitly decided to not support the valid
> file length files with the new StreamingFileSink because they are really
> hard to handle for the user. I've pulled Klou into this conversation who is
> more knowledgeable and can give you a bit more advice.
> 
> Cheers,
> Till
> 
> On Mon, Aug 20, 2018 at 2:53 PM Artsem Semianenka  >
> wrote:
> 
> > I have an idea to create new version of HadoopRecoverableFsDataOutputStream
> > class (for example with name LegacyHadoopRecoverableFsDataOutputStream :) )
> > which will works with valid-length files without invoking truncate. And
> > modify check in HadoopRecoverableWriter to use
> > LegacyHadoopRecoverableFsDataOutputStream in case if Hadoop version is
> > lower then 2.7 . I will try to provide PR soon if no objections. I hope I
> > am on the right way.
> >
> > On Mon, 20 Aug 2018 at 14:40, Artsem Semianenka  > >
> > wrote:
> >
> > > Hi guys !
> > > I have a question regarding new StreamingFileSink (introduced in 1.6
> > > version) . We use this sink to write data into Parquet format. But I
> > faced
> > > with issue when trying to run job on Yarn cluster and save result to
> > HDFS.
> > > In our case we use latest Cloudera distributive (CHD 5.15) and it
> > contains
> > > HDFS 2.6.0  . This version is not support truncate method . I would like
> > to
> > > create Pull request but I want to ask your advice how better design this
> > > fix and which ideas are behind this decision . I saw similiar PR for
> > > BucketingSink https://github.com/apache/flink/pull/6108 
> > >  . Maybe I could
> > > also add support of valid-length files for older Hadoop versions ?
> > >
> > > P.S.Unfortently CHD 5.15 (with Hadoop 2.6) is the latest version of
> > > Cloudera distributive and we can't upgrade hadoop to 2.7 Hadoop .
> > >
> > > Best regards,
> > > Artsem
> > >
> >
> >
> > --
> >
> > С уважением,
> > Артем Семененко
> >
> 
> 
> -- 
> С уважением,
> Артем Семененко
> 



Side effect of DataStreamRel#translateToPlan

2018-08-21 Thread wangsan
Hi all,

I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that 
may cause the execution plan not as what we expected. Every time we call 
DataStreamRel#translateToPlan (in TableEnvirnment#explain, 
TableEnvirnment#writeToSink, etc), we add same operators in execution 
environment repeatedly. 

Should we eliminate the side effect of DataStreamRel#translateToPlan ? 

Best,  Wangsan

appendix

tenv.registerTableSource("test_source", sourceTable)

val t = tenv.sqlQuery("SELECT * from test_source")
println(tenv.explain(t))
println(tenv.explain(t))

implicit val typeInfo = TypeInformation.of(classOf[Row])
tenv.toAppendStream(t)
println(tenv.explain(t))
We call explain three times, and the Physical Execution Plan are all diffrent.

== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
  LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat

Stage 2 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 3 : Operator
content : Map
ship_strategy : FORWARD


== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
  LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat

Stage 2 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 3 : Operator
content : Map
ship_strategy : FORWARD

Stage 4 : Data Source
content : collect elements with CollectionInputFormat

Stage 5 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 6 : Operator
content : Map
ship_strategy : FORWARD


== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
  LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat

Stage 2 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 3 : Operator
content : Map
ship_strategy : FORWARD

Stage 4 : Data Source
content : collect elements with CollectionInputFormat

Stage 5 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 6 : Operator
content : Map
ship_strategy : FORWARD

Stage 7 : Data Source
content : collect elements with CollectionInputFormat

Stage 8 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 9 : Operator
content : Map
ship_strategy : FORWARD

Stage 10 : Operator
content : to: Row
ship_strategy : FORWARD

Stage 11 : Data Source
content : collect elements with CollectionInputFormat

Stage 12 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 13 : Operator
content : Map
ship_strategy : FORWARD



Re: Flink checkpointing to Google Cloud Storage

2018-08-21 Thread Oleksandr Serdiukov
Now I am able to write checkpoints but cannot restore from it:

java.lang.NoClassDefFoundError:
com/google/cloud/hadoop/gcsio/GoogleCloudStorageImpl$6
at 
com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(GoogleCloudStorageImpl.java:666)
at 
com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(GoogleCloudStorageFileSystem.java:323)
at 
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.(GoogleHadoopFSInputStream.java:136)
at 
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(GoogleHadoopFileSystemBase.java:1102)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787)
at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119)
at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36)
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:80)
at 
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.copyStateDataHandleData(RocksDBKeyedStateBackend.java:1005)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllDataFromStateHandles(RocksDBKeyedStateBackend.java:988)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllStateDataToDirectory(RocksDBKeyedStateBackend.java:974)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:758)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:732)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:443)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:149)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:276)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:132)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)


My current setup:


com.google.cloud.bigdataoss
gcs-connector
hadoop2-1.9.5



On Thu, Aug 16, 2018 at 11:55 AM, Oleksandr Serdiukov 
wrote:

> Hello All!
>
> I am trying to configure checkpoints for flink jobs in GCS.
> Unfortunately, it fails after submitting a job. I run it using
> docker-compose on my local machine.
>
> Any thoughts of it?
> Thanks!
>
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Could not find a file system implementation for scheme 'gs'. The scheme is
> not directly supported by Flink and no Hadoop file system to support this
> scheme could be loaded.
> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> FileSystem.java:405)
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
> at org.apache.flink.runtime.state.filesystem.
> FsCheckpointStorage.(FsCheckpointStorage.java:61)
> at org.apache.flink.runtime.state.filesystem.FsStateBackend.
> createCheckpointStorage(FsStateBackend.java:441)
> at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.
> createCheckpointStorage(RocksDBStateBackend.java:379)
> at org.apache.flink.runtime.checkpoint.
> CheckpointCoordinator.(CheckpointCoordinator.java:247)
> ... 33 more
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Hadoop is not in the classpath/dependencies.
> at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(
> UnsupportedSchemeFactory.java:64)
> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> FileSystem.java:401)
>
>
> Env configuration is like this:
>
> StreamExecutionEnvironment env = applicationContext.getBean(
> 

[ANNOUNCE] Apache Flink 1.5.3 released

2018-08-21 Thread Chesnay Schepler
|The Apache Flink community is very happy to announce the release of 
Apache Flink 1.5.3, which is the third bugfix release for the Apache 
Flink 1.5 series.


Apache Flink® is an open-source stream processing framework for 
distributed, high-performing, always-available, and accurate data 
streaming applications.


The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the 
improvements for this bugfix release:

https://flink.apache.org/news/2018/08/21/release-1.5.3.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12343777

We would like to thank all contributors of the Apache Flink community 
who made this release possible!


Regards,
Chesnay|


Re: Support Hadoop 2.6 for StreamingFileSink

2018-08-21 Thread Artsem Semianenka
Thanks for reply, Till !

Buy the way, If Flink going to support compatibility with Hadoop 2.6 I
don't see another way how to achieve it.
As I mention before one of popular distributive Cloudera still based on
Hadoop 2.6 and it very sad if Flink unsupport it.
I really want to help Flink comunity to support this legacy. But currently
I see only one way to acheve it by emulate 'truncate' logic and recreate
new file with needed lenght and replace old .

Cheers,
Artsem

On Tue, 21 Aug 2018 at 14:41, Till Rohrmann  wrote:

> Hi Artsem,
>
> if I recall correctly, then we explicitly decided to not support the valid
> file length files with the new StreamingFileSink because they are really
> hard to handle for the user. I've pulled Klou into this conversation who is
> more knowledgeable and can give you a bit more advice.
>
> Cheers,
> Till
>
> On Mon, Aug 20, 2018 at 2:53 PM Artsem Semianenka 
> wrote:
>
> > I have an idea to create new version of
> HadoopRecoverableFsDataOutputStream
> > class (for example with name LegacyHadoopRecoverableFsDataOutputStream
> :) )
> > which will works with valid-length files without invoking truncate. And
> > modify check in HadoopRecoverableWriter to use
> > LegacyHadoopRecoverableFsDataOutputStream in case if Hadoop version is
> > lower then 2.7 . I will try to provide PR soon if no objections. I hope I
> > am on the right way.
> >
> > On Mon, 20 Aug 2018 at 14:40, Artsem Semianenka 
> > wrote:
> >
> > > Hi guys !
> > > I have a question regarding new StreamingFileSink (introduced in 1.6
> > > version) . We use this sink to write data into Parquet format. But I
> > faced
> > > with issue when trying to run job on Yarn cluster and save result to
> > HDFS.
> > > In our case we use latest Cloudera distributive (CHD 5.15) and it
> > contains
> > > HDFS 2.6.0  . This version is not support truncate method . I would
> like
> > to
> > > create Pull request but I want to ask your advice how better design
> this
> > > fix and which ideas are behind this decision . I saw similiar PR for
> > > BucketingSink https://github.com/apache/flink/pull/6108 . Maybe I
> could
> > > also add support of valid-length files for older Hadoop versions ?
> > >
> > > P.S.Unfortently CHD 5.15 (with Hadoop 2.6) is the latest version of
> > > Cloudera distributive and we can't upgrade hadoop to 2.7 Hadoop .
> > >
> > > Best regards,
> > > Artsem
> > >
> >
> >
> > --
> >
> > С уважением,
> > Артем Семененко
> >
>


-- 

С уважением,
Артем Семененко


Re: SQL Client Limitations

2018-08-21 Thread Fabian Hueske
Hi Dominik,

The SQL Client supports the same subset of SQL that you get with Java /
Scala embedded queries.
The documentation [1] covers all supported operations.

There are some limitations because certain operators require special time
attributes (row time or processing time attributes) which are monotonically
increasing.
Some operators such as a regular join (in contrast to a time-windowed join)
remove the monotonicity property of time attributes such that time-based
operations cannot be applied anymore.

Best,
Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-
release-1.6/dev/table/sql.html



2018-08-21 13:27 GMT+02:00 Till Rohrmann :

> Hi Dominik,
>
> I think such a list would be really helpful. I've pulled Timo and Fabian
> into this conversation because they probably know more.
>
> Cheers,
> Till
>
> On Mon, Aug 20, 2018 at 12:43 PM Dominik Wosiński 
> wrote:
>
>> Hey,
>>
>> Do we have any list of current limitations of SQL Client available
>> somewhere or the only way is to go through JIRA issues?
>>
>> For example:
>> I tried to make Group By Tumble Window and Inner Join in one query and it
>> seems that it is not possible currently and I was wondering whether it's
>> and issue with my query or known limitation.
>>
>> Thanks,
>> Best Regards,
>> Dominik.
>>
>


[RESULT][VOTE] Release 1.5.3, release candidate #1

2018-08-21 Thread Chesnay Schepler

|I'm happy to announce that we have unanimously approved this release.|
|There are 4 approving votes, 3 of which are binding:|
|* Till (binding)|
|* vino (non-binding)
|
|* Gordon (binding)
|
|* Chesnay (binding)
|
|There are no disapproving votes.|
|Thanks everyone!|

On 21.08.2018 13:53, Chesnay Schepler wrote:

+1

On 20.08.2018 17:58, Tzu-Li (Gordon) Tai wrote:

+1 (binding)

- verified checksum and gpg files
- verified source compiles (tests enabled), Scala 2.11 / without Hadoop
- e2e tests pass locally
- source release contains no binaries
- no missing release artifacts in staging area
- reviewed announcement PR, is LGTM

Cheers,
Gordon


On Sat, Aug 18, 2018 at 7:56 PM vino yang  wrote:


+1,

- checkout the source code of Flink v1.5.3-rc1 and packaged (skip 
tests)

successfully
- reviewed announcement blog post and gave a little comment
- ran flink-table's test successfully
- checked all modules' version number is 1.5.3

Thanks, vino.

Till Rohrmann  于2018年8月18日周六 上午12:25写道: 




+1 (binding)

- built Flink from source release with Hadoop version 2.8.4
- executed all end-to-end tests sucessfully
- executed Jepsen test suite successfully with binary release

Cheers,
Till

On Thu, Aug 16, 2018, 12:59 Chesnay Schepler  
wrote:



Hi everyone,
Please review and vote on the release candidate #1 for the version
1.5.3, as follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)


The complete staging area is available for your review, which 
includes:

* JIRA release notes [1],
* the official Apache source release and binary convenience 
releases to
be deployed to dist.apache.org [2], which are signed with the key 
with

fingerprint 11D464BA [3],
* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag "release-1.5.3-rc1" [5],
* website pull request listing the new release and adding 
announcement

blog post [6].

The vote will be open for at least 72 hours. It is adopted by 
majority

approval, with at least 3 PMC affirmative votes.

Thanks,
Chesnay

[1]


https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12343777 


[2] https://dist.apache.org/repos/dist/dev/flink/flink-1.5.3/
[3] https://dist.apache.org/repos/dist/release/flink/KEYS
[4]

https://repository.apache.org/content/repositories/orgapacheflink-1179

[5]


https://gitbox.apache.org/repos/asf?p=flink.git;a=tag;h=refs/tags/release-1.5.3-rc1 


[6] https://github.com/apache/flink-web/pull/119













Re: [VOTE] Release 1.5.3, release candidate #1

2018-08-21 Thread Chesnay Schepler

+1

On 20.08.2018 17:58, Tzu-Li (Gordon) Tai wrote:

+1 (binding)

- verified checksum and gpg files
- verified source compiles (tests enabled), Scala 2.11 / without Hadoop
- e2e tests pass locally
- source release contains no binaries
- no missing release artifacts in staging area
- reviewed announcement PR, is LGTM

Cheers,
Gordon


On Sat, Aug 18, 2018 at 7:56 PM vino yang  wrote:


+1,

- checkout the source code of Flink v1.5.3-rc1 and packaged (skip tests)
successfully
- reviewed announcement blog post and gave a little comment
- ran flink-table's test successfully
- checked all modules' version number is 1.5.3

Thanks, vino.

Till Rohrmann  于2018年8月18日周六 上午12:25写道:


+1 (binding)

- built Flink from source release with Hadoop version 2.8.4
- executed all end-to-end tests sucessfully
- executed Jepsen test suite successfully with binary release

Cheers,
Till

On Thu, Aug 16, 2018, 12:59 Chesnay Schepler  wrote:


Hi everyone,
Please review and vote on the release candidate #1 for the version
1.5.3, as follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)


The complete staging area is available for your review, which includes:
* JIRA release notes [1],
* the official Apache source release and binary convenience releases to
be deployed to dist.apache.org [2], which are signed with the key with
fingerprint 11D464BA [3],
* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag "release-1.5.3-rc1" [5],
* website pull request listing the new release and adding announcement
blog post [6].

The vote will be open for at least 72 hours. It is adopted by majority
approval, with at least 3 PMC affirmative votes.

Thanks,
Chesnay

[1]



https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12343777

[2] https://dist.apache.org/repos/dist/dev/flink/flink-1.5.3/
[3] https://dist.apache.org/repos/dist/release/flink/KEYS
[4]

https://repository.apache.org/content/repositories/orgapacheflink-1179

[5]



https://gitbox.apache.org/repos/asf?p=flink.git;a=tag;h=refs/tags/release-1.5.3-rc1

[6] https://github.com/apache/flink-web/pull/119










Re: [Discussion] Move cluster creation functions from flink-cli.sh to other scripts

2018-08-21 Thread Till Rohrmann
How would that look in detail given that I am a user who wants to start a
job cluster?

On Tue, Aug 21, 2018 at 12:09 PM Renjie Liu  wrote:

> But it's also dangerous if user need to submit job via the same cluster
> configuration. I mean we can use another script for starting job cluster,
> so that cluster administrators can separate environments.
> On Tue, Aug 21, 2018 at 5:53 PM Till Rohrmann 
> wrote:
>
> > Hi Renjie,
> >
> > do you mean to not support deploying Flink clusters via
> > `FLINK_HOME/bin/flink -m yarn-cluster`? I think starting a dedicated job
> > cluster for a given job that way is quite useful.
> >
> > Cheers,
> > Till
> >
> > On Thu, Aug 9, 2018 at 4:33 AM Renjie Liu 
> wrote:
> >
> > > Hi:
> > > Flink 1.5.0 brings new deployment mode and we are able to provid a
> single
> > > cluster for our jobs which manages resources elastically. However, this
> > > also brings challenges for our cluster management since users can
> submit
> > > jobs or create yarn cluster in the same flink-cli.sh.
> > > Here is the motivation case for this issue:
> > >
> > > 1. Admins deployed a mesos session cluster using zookeeper ha mode
> > > 2. End user can submit jobs to this cluster
> > > 3. User can also create yarn cluster using the same conf mistakenly,
> > which
> > > brings our mesos session cluster down
> > >
> > > I propose to separate cluster manager functionalities from
> flink-cli.sh,
> > > and put them into other scripts.
> > > --
> > > Liu, Renjie
> > > Software Engineer, MVAD
> > >
> >
> --
> Liu, Renjie
> Software Engineer, MVAD
>


Re: Support Hadoop 2.6 for StreamingFileSink

2018-08-21 Thread Till Rohrmann
Hi Artsem,

if I recall correctly, then we explicitly decided to not support the valid
file length files with the new StreamingFileSink because they are really
hard to handle for the user. I've pulled Klou into this conversation who is
more knowledgeable and can give you a bit more advice.

Cheers,
Till

On Mon, Aug 20, 2018 at 2:53 PM Artsem Semianenka 
wrote:

> I have an idea to create new version of HadoopRecoverableFsDataOutputStream
> class (for example with name LegacyHadoopRecoverableFsDataOutputStream :) )
> which will works with valid-length files without invoking truncate. And
> modify check in HadoopRecoverableWriter to use
> LegacyHadoopRecoverableFsDataOutputStream in case if Hadoop version is
> lower then 2.7 . I will try to provide PR soon if no objections. I hope I
> am on the right way.
>
> On Mon, 20 Aug 2018 at 14:40, Artsem Semianenka 
> wrote:
>
> > Hi guys !
> > I have a question regarding new StreamingFileSink (introduced in 1.6
> > version) . We use this sink to write data into Parquet format. But I
> faced
> > with issue when trying to run job on Yarn cluster and save result to
> HDFS.
> > In our case we use latest Cloudera distributive (CHD 5.15) and it
> contains
> > HDFS 2.6.0  . This version is not support truncate method . I would like
> to
> > create Pull request but I want to ask your advice how better design this
> > fix and which ideas are behind this decision . I saw similiar PR for
> > BucketingSink https://github.com/apache/flink/pull/6108 . Maybe I could
> > also add support of valid-length files for older Hadoop versions ?
> >
> > P.S.Unfortently CHD 5.15 (with Hadoop 2.6) is the latest version of
> > Cloudera distributive and we can't upgrade hadoop to 2.7 Hadoop .
> >
> > Best regards,
> > Artsem
> >
>
>
> --
>
> С уважением,
> Артем Семененко
>


Re: SQL Client Limitations

2018-08-21 Thread Till Rohrmann
Hi Dominik,

I think such a list would be really helpful. I've pulled Timo and Fabian
into this conversation because they probably know more.

Cheers,
Till

On Mon, Aug 20, 2018 at 12:43 PM Dominik Wosiński  wrote:

> Hey,
>
> Do we have any list of current limitations of SQL Client available
> somewhere or the only way is to go through JIRA issues?
>
> For example:
> I tried to make Group By Tumble Window and Inner Join in one query and it
> seems that it is not possible currently and I was wondering whether it's
> and issue with my query or known limitation.
>
> Thanks,
> Best Regards,
> Dominik.
>


Re: [Proposal] Utilities for reading, transforming and creating Streaming savepoints

2018-08-21 Thread Till Rohrmann
big +1 for this feature. A tool to get your state out of and into Flink
will be tremendously helpful.

On Mon, Aug 20, 2018 at 10:21 AM Aljoscha Krettek 
wrote:

> +1 I'd like to have something like this in Flink a lot!
>
> > On 19. Aug 2018, at 11:57, Gyula Fóra  wrote:
> >
> > Hi all!
> >
> > Thanks for the feedback and I'm happy there is some interest :)
> > Tomorrow I will start improving the proposal based on the feedback and
> will
> > get back to work.
> >
> > If you are interested working together in this please ping me and we can
> > discuss some ideas/plans and how to share work.
> >
> > Cheers,
> > Gyula
> >
> > Paris Carbone  ezt írta (időpont: 2018. aug. 18., Szo,
> 9:03):
> >
> >> +1
> >>
> >> Might also be a good start to implement queryable stream state with
> >> snapshot isolation using that mechanism.
> >>
> >> Paris
> >>
> >>> On 17 Aug 2018, at 12:28, Gyula Fóra  wrote:
> >>>
> >>> Hi All!
> >>>
> >>> I want to share with you a little project we have been working on at
> King
> >>> (with some help from some dataArtisans folks). I think this would be a
> >>> valuable addition to Flink and solve a bunch of outstanding production
> >>> use-cases and headaches around state bootstrapping and state analytics.
> >>>
> >>> We have built a quick and dirty POC implementation on top of Flink 1.6,
> >>> please check the README for some nice examples to get a quick idea:
> >>>
> >>> https://github.com/king/bravo
> >>>
> >>> *Short story*
> >>> Bravo is a convenient state reader and writer library leveraging the
> >>> Flink’s batch processing capabilities. It supports processing and
> writing
> >>> Flink streaming savepoints. At the moment it only supports processing
> >>> RocksDB savepoints but this can be extended in the future for other
> state
> >>> backends and checkpoint types.
> >>>
> >>> Our goal is to cover a few basic features:
> >>>
> >>>  - Converting keyed states to Flink DataSets for processing and
> >> analytics
> >>>  - Reading/Writing non-keyed operators states
> >>>  - Bootstrap keyed states from Flink DataSets and create new valid
> >>>  savepoints
> >>>  - Transform existing savepoints by replacing/changing some states
> >>>
> >>>
> >>> Some example use-cases:
> >>>
> >>>  - Point-in-time state analytics across all operators and keys
> >>>  - Bootstrap state of a streaming job from external resources such as
> >>>  reading from database/filesystem
> >>>  - Validate and potentially repair corrupted state of a streaming job
> >>>  - Change max parallelism of a job
> >>>
> >>>
> >>> Our main goal is to start working together with other Flink production
> >>> users and make this something useful that can be part of Flink. So if
> you
> >>> have use-cases please talk to us :)
> >>> I have also started a google doc which contains a little bit more info
> >> than
> >>> the readme and could be a starting place for discussions:
> >>>
> >>>
> >>
> https://docs.google.com/document/d/103k6wPX20kMu5H3SOOXSg5PZIaYpwdhqBMr-ppkFL5E/edit?usp=sharing
> >>>
> >>> I know there are a bunch of rough edges and bugs (and no tests) but our
> >>> motto is: If you are not embarrassed, you released too late :)
> >>>
> >>> Please let me know what you think!
> >>>
> >>> Cheers,
> >>> Gyula
> >>
> >>
>
>


[jira] [Created] (FLINK-10189) FindBugs warnings: Inefficient use of keySet iterator instead of entrySet iterator

2018-08-21 Thread Hiroaki Yoshida (JIRA)
Hiroaki Yoshida created FLINK-10189:
---

 Summary: FindBugs warnings: Inefficient use of keySet iterator 
instead of entrySet iterator
 Key: FLINK-10189
 URL: https://issues.apache.org/jira/browse/FLINK-10189
 Project: Flink
  Issue Type: Bug
Reporter: Hiroaki Yoshida


FindBugs-3.0.1 ([http://findbugs.sourceforge.net/]) reported two 
WMI_WRONG_MAP_ITERATOR warnings on master:
{code:java}
M P WMI: org.apache.flink.runtime.state.ttl.TtlMapState.putAll(Map) makes 
inefficient use of keySet iterator instead of entrySet iterator  At 
TtlMapState.java:[line 72]
M P WMI: org.apache.flink.addons.hbase.HBaseTableSource.projectFields(int[]) 
makes inefficient use of keySet iterator instead of entrySet iterator  At 
HBaseTableSource.java:[line 19] 
{code}
The description of the bug is as follows:
{quote}*WMI: Inefficient use of keySet iterator instead of entrySet iterator 
(WMI_WRONG_MAP_ITERATOR)*
This method accesses the value of a Map entry, using a key that was retrieved 
from a keySet iterator. It is more efficient to use an iterator on the entrySet 
of the map, to avoid the Map.get(key) lookup.
[http://findbugs.sourceforge.net/bugDescriptions.html#WMI_WRONG_MAP_ITERATOR]
{quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [Discussion] Move cluster creation functions from flink-cli.sh to other scripts

2018-08-21 Thread Renjie Liu
But it's also dangerous if user need to submit job via the same cluster
configuration. I mean we can use another script for starting job cluster,
so that cluster administrators can separate environments.
On Tue, Aug 21, 2018 at 5:53 PM Till Rohrmann  wrote:

> Hi Renjie,
>
> do you mean to not support deploying Flink clusters via
> `FLINK_HOME/bin/flink -m yarn-cluster`? I think starting a dedicated job
> cluster for a given job that way is quite useful.
>
> Cheers,
> Till
>
> On Thu, Aug 9, 2018 at 4:33 AM Renjie Liu  wrote:
>
> > Hi:
> > Flink 1.5.0 brings new deployment mode and we are able to provid a single
> > cluster for our jobs which manages resources elastically. However, this
> > also brings challenges for our cluster management since users can submit
> > jobs or create yarn cluster in the same flink-cli.sh.
> > Here is the motivation case for this issue:
> >
> > 1. Admins deployed a mesos session cluster using zookeeper ha mode
> > 2. End user can submit jobs to this cluster
> > 3. User can also create yarn cluster using the same conf mistakenly,
> which
> > brings our mesos session cluster down
> >
> > I propose to separate cluster manager functionalities from flink-cli.sh,
> > and put them into other scripts.
> > --
> > Liu, Renjie
> > Software Engineer, MVAD
> >
>
-- 
Liu, Renjie
Software Engineer, MVAD


Re: [Discussion] Move cluster creation functions from flink-cli.sh to other scripts

2018-08-21 Thread Till Rohrmann
Hi Renjie,

do you mean to not support deploying Flink clusters via
`FLINK_HOME/bin/flink -m yarn-cluster`? I think starting a dedicated job
cluster for a given job that way is quite useful.

Cheers,
Till

On Thu, Aug 9, 2018 at 4:33 AM Renjie Liu  wrote:

> Hi:
> Flink 1.5.0 brings new deployment mode and we are able to provid a single
> cluster for our jobs which manages resources elastically. However, this
> also brings challenges for our cluster management since users can submit
> jobs or create yarn cluster in the same flink-cli.sh.
> Here is the motivation case for this issue:
>
> 1. Admins deployed a mesos session cluster using zookeeper ha mode
> 2. End user can submit jobs to this cluster
> 3. User can also create yarn cluster using the same conf mistakenly, which
> brings our mesos session cluster down
>
> I propose to separate cluster manager functionalities from flink-cli.sh,
> and put them into other scripts.
> --
> Liu, Renjie
> Software Engineer, MVAD
>


Re: [DISCUSS] Change the Keyed partitioning behavior of the Kafka Producer API

2018-08-21 Thread Tzu-Li (Gordon) Tai
Hi Niels,

Your conclusions are accurate, and I also agree with the fact that the
combination of the KeyedSerializationSchema / providing partitioners, etc.
is all a bit awkward as of the current state.

As for the proposed solutions, I personally disagree with 1), since key
partitioning, IMO, should be the default behavior. There were actually
already discussions in making that happen once a Kafka connector remake
happens in the future.
And yes, 2) seems to be the best solution here.

To round up solution 2), we have:
- A constructor that takes a KeyedSerializationSchema, and NO partitioner.
This implicitly uses Kafka's key partitioning.
- A constructor that takes a SerializationSchema, and
Optional. By default, the `FlinkFixedPartitioner` is
used. If None is provided, then we use round-robin partitioning.
Though, this would be breaking because default partitioning behaviours for
the KeyedSerializationSchema variant would change.

I would vote against introducing a breaking change now, since key
partitioning is still achievable right now (although admittedly in a very
non-friendly way).
Instead, we only incorporate these ideas when the Kafka connector remake
happens.
There has already been thoughts in doing this, triggered by many other
aspects (reworking Flink's source interface, having a common abstraction
for efficient partition discovery / idleness detection in partition-based
replayable sources, etc. )

Overall, I think that this discussion also brings up another aspect of the
`KeyedSerializationSchema` - it bundles too many concerns within a single
interface.
1. It defines the serialization.
2. It extracts the partitioning key for each record (though it may never be
used), due to custom partitioning. This might have been better off with a
separate `KafkaKeyExtractor`, for example.
3. It decides the target topic for each record, which may be more suitable
in the `FlinkKafkaPartitioner` interface.

Cheers,
Gordon

On Sun, Aug 19, 2018 at 4:44 PM Niels Basjes  wrote:

> Hi,
>
> A while ago we had found that if you construct a Kafka Producer that it
> always uses the  FlinkFixedPartitioner to spread the data across the Kafka
> partitions.
> Except when you give it a custom partitioner.
>
> Because we want all our elements to be partitioned by the key of the
> records we created this issue and put up a pull request with a
> simple FlinkKeyHashPartitioner.
>
> https://issues.apache.org/jira/browse/FLINK-9610
>
> A comment by one of the reviewers (Tzu-Li Tai) was essentially: "Kafka does
> this by default already, why this change?"
>
> So I did a lot deeper digging to understand how the partitioning decisions
> and data flows from the Flink API down into the Kafka producer client code.
>
> My conclusions:
> 1) The Kafka producer code uses the provided partitioner, if it doesn't
> have that it uses the hash of the key, if it doesn't have a key then it
> does a round robin distribution.
> 2) The Flink Kafka producer constructors are available in the variants with
> and without a partitioner. Even if you provide a valid key for each record
> it will still use the  FlinkFixedPartitioner if no explicit partitioner has
> been specified.
>
> Looking at the code (I haven't tried it) you can actually get the desired
> behavior without any code changes by using the constructor that requires a
> partitioner and there give it a null value.
> Yuck!
>
> In my opinion providing a KeyedSerializationSchema is an implicit way of
> specifying that you want to use that key to partition the data by.
>
> So to make this a workable situation I see three ways to handle this:
> 1) We merge something like the partitioner I proposed.
> 2) We change the constructors that get a KeyedSerializationSchema to use
> that key for partitioning.
> 3) We remove all constructors that have a KeyedSerializationSchema because
> the key is never used anyway.
>
> I think '3)' is bad, '1)' is 'Ok' and '2)' although breaking backward
> compatibility is the best solution.
>
> So to clarify the change I propose here:
> We change the behavior of the all flink producer constructors that have
> a KeyedSerializationSchema parameter and NO partitioner.
> The proposed change is that because we HAVE a key and we do NOT have a
> partitioner the partitioning is done by the partitioning code that already
> exists in the underlying Kafka.
>
> So for the rest of the constructors the behavior remains unchanged:
> - With a  NON-Keyed SerializationSchema
> - With a provided partitioner
>
> What do you guys think?
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>


[jira] [Created] (FLINK-10188) Solve nondeterministic functions problem for retraction

2018-08-21 Thread Hequn Cheng (JIRA)
Hequn Cheng created FLINK-10188:
---

 Summary: Solve nondeterministic functions problem for retraction
 Key: FLINK-10188
 URL: https://issues.apache.org/jira/browse/FLINK-10188
 Project: Flink
  Issue Type: Bug
  Components: Table API  SQL
Reporter: Hequn Cheng


Currently, Retraction has not considered non-deterministic functions. For sql 
like:
{code}
source -> group by -> 
 non-window join -> retract_sink
source -> group by -> 
{code}
The group by will send retract messages to join. However, if we add 
LOCALTIMESTAMP between group by and join, messages can not be retracted 
correctly in join, since join retract messages according to the whole row.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10187) Fix LogicalUnnestRule to match Correlate/Uncollect correctly

2018-08-21 Thread Shuyi Chen (JIRA)
Shuyi Chen created FLINK-10187:
--

 Summary: Fix LogicalUnnestRule to match Correlate/Uncollect 
correctly
 Key: FLINK-10187
 URL: https://issues.apache.org/jira/browse/FLINK-10187
 Project: Flink
  Issue Type: Bug
  Components: Table API  SQL
Affects Versions: 1.6.0
Reporter: Shuyi Chen
Assignee: Shuyi Chen






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)