Re: How to create schema for Flink table

2019-01-26 Thread Hequn Cheng
Hi Soheil,

DataSet can be converted to or from a Table. More details here:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#convert-a-datastream-or-dataset-into-a-table
Let me know if you have any questions.

Best, Hequn

On Sun, Jan 27, 2019 at 5:37 AM Soheil Pourbafrani 
wrote:

> I want to do some transformation on raw data and then create a table from
> that. Is it possible to create a schema for the table (in advance) and then
> using that transformed dataset and schema create the table?
>


Re: Query on retract stream

2019-01-26 Thread Hequn Cheng
Hi Gagan,

Besides the eventime and proctime difference, there is another difference
between the two ways. The window aggregate on bounded data, while unbounded
aggregate on unbounded data, i.e., the new coming data can update a very
old data.

As for the performance, I think the two ways may have no big difference in
current Flink version. Maybe you can run some tests between them on your
own scenarios if both of them can solve your problem. FYI: There is a nice
discussion[1] raised by Timo recently. Once Blink is merged into Flink, the
unbounded aggregate will be much faster than the window.

Best,
Hequn

[1] https://lists.apache.org/list.html?d...@flink.apache.org:lte=1M:FLIP-32


On Sat, Jan 26, 2019 at 4:11 PM Gagan Agrawal 
wrote:

> Thanks Hequn for suggested solutions and I think this should really work
> and will give it a try. As I understand First solution  of using multiple
> windows will be good for those scenarios where I want output to be
> generated post window is materialized (i.e. watermark reaches end of
> window). And second will be good if I want it to be fired on per event
> basis (i.e no watermarking). Apart from this, do you see any difference
> from performance perspective in choosing between the two or both should be
> equally performant?
>
> Gagan
>
> On Sat, Jan 26, 2019 at 11:50 AM Hequn Cheng  wrote:
>
>> Hi Gagan,
>>
>> Time attribute fields will be materialized by the unbounded groupby.
>> Also, currently, the window doesn't have the ability to handle retraction
>> messages. I see two ways to solve the problem.
>>
>> - Use multi-window.  The first window performs lastValue, the second
>> performs count.
>> - Use two non-window aggregates. In this case, you don't have to change
>> anything for the first aggregate. For the second one, you can group by an
>> hour field and perform count(). The code looks like:
>>
>> SELECT userId,
>>  count(orderId)
>> FROM
>> (SELECT orderId,
>>  getHour(orderTime) as myHour,
>>  lastValue(userId) AS userId,
>>  lastValue(status) AS status
>> FROM orders
>> GROUP BY  orderId, orderTime)
>> WHERE status='PENDING'
>> GROUP BY myHour, userId
>>
>> Best,
>> Hequn
>>
>>
>>
>>
>> On Sat, Jan 26, 2019 at 12:29 PM Gagan Agrawal 
>> wrote:
>>
>>> Based on the suggestions in this mail thread, I tried out few
>>> experiments on upsert stream with flink 1.7.1 and here is the issue I am
>>> facing with window stream.
>>>
>>> *1. Global Pending order count. *
>>> Following query works fine and it's able to handle updates as per
>>> original requirement.
>>>
>>> select userId, count(orderId) from
>>> (select orderId, lastValue(userId) as userId, lastValue(status) as
>>> status from orders group by orderId)
>>> where status='PENDING' group by userId
>>>
>>> *2. Last 1 Hour tumbling window count (Append stream)*
>>> Though following query doesn't handle upsert stream, I just tried to
>>> make sure time column is working fine. This is working, but as expected, it
>>> doesn't handle updates on orderId.
>>>
>>> select userId, count(orderId) from orders
>>> where status='PENDING' group by TUMBLE(orderTime, INTERVAL '1' HOUR),
>>> userId
>>>
>>> 3. *Last 1 Hour tumbling window count (With upsert stream)*
>>> Now I tried combination of above two where input stream is converted to
>>> upsert stream (via lastValue aggregate function) and then Pending count
>>> needs to be calculated in last 1 hour window.
>>>
>>> select userId, count(orderId) from
>>> (select orderId, orderTime, lastValue(userId) as userId,
>>> lastValue(status) as status from orders group by orderId, orderTime)
>>> where status='PENDING' group by TUMBLE(orderTime, INTERVAL '1' HOUR),
>>> userId
>>>
>>> This one gives me following error. Is this because I have added
>>> orderTime in group by/select clause and hence it's time characteristics
>>> have changed? What is the workaround here as without adding orderTime, I
>>> can not perform window aggregation on upsert stream.
>>>
>>> [error] Exception in thread "main"
>>> org.apache.flink.table.api.ValidationException:* Window can only be
>>> defined over a time attribute column.*
>>> [error] at
>>> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:84)
>>> [error] at
>>> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:89)
>>> [error] at
>>> org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:65)
>>> [error] at
>>> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:315)
>>> [error] at
>>> org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
>>> [error] at
>>> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415)
>>> [error] at
>>> org.a

Re: Best pattern for achieving stream enrichment (side-input) from a large static source

2019-01-26 Thread Ken Krugler

> On Jan 26, 2019, at 1:08 PM, Nimrod Hauser  
> wrote:
> 
> Hey Ken, 
> 
> Thank you for your quick response! That definitely sounds like something 
> worth exploring.
> Just a few more small questions, if that's ok.
> 
> 1. You referred to the parquet source as a "stream", but what we have is a 
> static data-source which we will always want to "query" against .
> What we thought about doing is to stream the entire parquet dataset and 
> load it into our state. 
> Does that sound right, or is that "hacky”?

Not sure what you mean by “stream the entire parquet dataset”. Do you mean 
you’d load it into memory yourself, and then distribute it?

If so, then yes you can do that, but you’d have to obviously re-load it 
yourself, and partition it (since it’s also keyed, right?) yourself, etc.

> 2. Can the continuousFileMonitoringFunction be used to track an entire 
> directory of parquet files?

Yes.

> Also, we'd like it to refresh its' state (= its' internal data structures) 
> every time the parquet folder is updated, but only after all new files have 
> been written (meaning, we'll need it to run once an update has been detected, 
> but not right away)
> Is that a reasonable use-case?

It’s a reasonable use case, but it precludes using the 
ContinuousFileMonitoringFunction.

You can write a custom SourceFunction, but where it gets tricky is handling 
failure recovery (checkpointing).

But I should also have mentioned the fundamental issue with this kind of 
enrichment in Flink - you can’t control the ordering of the two streams 
(easily), so you have to be prepared to buffer data from Kafka until you’ve got 
a complete set of data from Parquet.

We’d worked around a similar issue with a UnionedSources 

 source function, but I haven’t validated that it handles checkpointing 
correctly.

— Ken

 
> 
> And thank you once again.
> 
> Nimrod.
> 
> On Sat, Jan 26, 2019 at 7:10 PM Ken Krugler  > wrote:
> Hi Nimrod,
> 
> One approach is as follows…
> 
> 1. For the Parquet data, you could use a ContinuousFileMonitoringFunction 
> 
>  to generate a stream of enrichment records.
> 
> 2. Then use a CoMapFunction to “merge” the two connected streams (Kafka and 
> Parquet), and output something like Tuple2>
> 
> 3. In your enrichment function, based on what’s in the Either<> you’re either 
> updating your enrichment state, or processing a record from Kafka.
> 
> But I think you might want to add a stateful function in the Parquet stream, 
> so that you can separately track Kafka record state in the enrichment 
> function.
> 
> There’s also the potential issue of wanting to buffer Kafka data in the 
> enrichment function, if you need to coordinate with the enrichment data (e.g. 
> you need to get a complete set of updated enrichment data before applying any 
> of it to the incoming Kafka data).
> 
> — Ken
> 
> 
> 
>> On Jan 26, 2019, at 8:04 AM, Nimrod Hauser > > wrote:
>> 
>> Hello,
>> 
>> We're using Flink on a high velocity data-stream, and we're looking for the 
>> best way to enrich our stream using a large static source (originating from 
>> Parquet files, which are rarely updated).
>> 
>> The source for the enrichment weights a few GBs, which is why we want to 
>> avoid using techniques such as broadcast streams, which cannot be keyed and 
>> need to be duplicated for every Flink operator that is used.
>> 
>> We started looking into the possibility of merging streams with datasets, or 
>> using the Table API, but any best-practice that's been done before will be 
>> greatly appreciated.
>> 
>> I'm attaching a simple chart for convenience,
>> 
>> Thanks you very much,
>> 
>> Nimrod.
>> 

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: K8s and flink1.7.1

2019-01-26 Thread Vishal Santoshi
And both worked.. I should have said

   - modifying the docker entrypoint script to first configure
   *taskmanager.host*  using the status.podIP as an override or in
   flink-conf.yaml before the process is launched through the entry script.


Thank you all.

On Sat, Jan 26, 2019 at 4:11 PM Vishal Santoshi 
wrote:

> For 1. Thank you for pointing out that property. I surely overlooked it.
> For 2. Will try out the other options.  It seems the suggestion that best
> suits us ( we do not want to over engineer on the init container side
>
>- configure *metrics.internal.query-service.port* property to some
>fixed port (e.g. **)
>- modifying the docker entrypoint script to first configure
>*taskmanager.host*
>
>
> I think this is what you seem to refer to as a possible solution ?
>
> The headless service would generally imply a single service for each TM
> and that is not sustainable..
>
>
>
>
>
> On Sat, Jan 26, 2019 at 1:37 PM Nagarjun Guraja 
> wrote:
>
>> For 1. you need to setup high-availability.jobmanager.port as a
>> predefined port in your flink-conf.yaml and expose the port via
>> job-manager-deployment and job-manager-service resources as well. That
>> should do the trick.
>>
>> For 2. I am not sure of the timelines, but there are a few decent/not
>> hacky workarounds to get around the problem, mentioned in the comments.
>> Feel free to pick one to unblock yourselves.
>>
>> Regards,
>> Nagarjun
>>
>> *Success is not final, failure is not fatal: it is the courage to
>> continue that counts. *
>> *- Winston Churchill - *
>>
>>
>> On Sat, Jan 26, 2019 at 5:39 AM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> There are we issues with 1.7.1 "job as a cluster" set up that I need
>>> guidance on
>>>
>>> 1. In HA set up, the TMs are not able to resolve the job manager's
>>> random port through the jobmanager.rpc.port
>>> 
>>> setting.  The setting does work in  the  non HA mode ( The containerPort
>>> /TCP with the same port facilitates that ), but then we loose the  job if
>>> the JM was to reboot. This is a high priority for us and I am sure there is
>>> a work around but I rather ask the experts.
>>>
>>> 2. The metrics on JM are not visible possibly due to
>>> https://issues.apache.org/jira/browse/FLINK-11127 . It is an open issue
>>> and both a service per TM and stateful set approach appear non production
>>> ready (not scalable and kludgey ). Do you have a time line when these will
>>> be resolved.
>>>
>>> Thanks.
>>>
>>> Vishal
>>>
>>


How to create schema for Flink table

2019-01-26 Thread Soheil Pourbafrani
I want to do some transformation on raw data and then create a table from
that. Is it possible to create a schema for the table (in advance) and then
using that transformed dataset and schema create the table?


Re: Best pattern for achieving stream enrichment (side-input) from a large static source

2019-01-26 Thread Nimrod Hauser
Hey Ken,

Thank you for your quick response! That definitely sounds like something
worth exploring.
Just a few more small questions, if that's ok.

1. You referred to the parquet source as a "stream", but what we have is a
static data-source which we will always want to "query" against .
What we thought about doing is to stream the entire parquet dataset and
load it into our state.
Does that sound right, or is that "hacky"?

2. Can the continuousFileMonitoringFunction be used to track an entire
directory of parquet files? Also, we'd like it to refresh its' state (=
its' internal data structures) every time the parquet folder is updated,
but only after all new files have been written (meaning, we'll need it to
run once an update has been detected, but not right away)
Is that a reasonable use-case?

And thank you once again.

Nimrod.

On Sat, Jan 26, 2019 at 7:10 PM Ken Krugler 
wrote:

> Hi Nimrod,
>
> One approach is as follows…
>
> 1. For the Parquet data, you could use a ContinuousFileMonitoringFunction
> 
>  to
> generate a stream of enrichment records.
>
> 2. Then use a CoMapFunction to “merge” the two connected streams (Kafka
> and Parquet), and output something like Tuple2>
>
> 3. In your enrichment function, based on what’s in the Either<> you’re
> either updating your enrichment state, or processing a record from Kafka.
>
> But I think you might want to add a stateful function in the Parquet
> stream, so that you can separately track Kafka record state in the
> enrichment function.
>
> There’s also the potential issue of wanting to buffer Kafka data in the
> enrichment function, if you need to coordinate with the enrichment data
> (e.g. you need to get a complete set of updated enrichment data before
> applying any of it to the incoming Kafka data).
>
> — Ken
>
>
>
> On Jan 26, 2019, at 8:04 AM, Nimrod Hauser 
> wrote:
>
> Hello,
>
> We're using Flink on a high velocity data-stream, and we're looking for
> the best way to enrich our stream using a large static source (originating
> from Parquet files, which are rarely updated).
>
> The source for the enrichment weights a few GBs, which is why we want to
> avoid using techniques such as broadcast streams, which cannot be keyed and
> need to be duplicated for every Flink operator that is used.
>
> We started looking into the possibility of merging streams with datasets,
> or using the Table API, but any best-practice that's been done before will
> be greatly appreciated.
>
> I'm attaching a simple chart for convenience,
>
> Thanks you very much,
>
> Nimrod.
> 
>
>
> --
> Ken Krugler
> +1 530-210-6378
>
> 
> http://www.scaleunlimited.com
> 
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>


Re: K8s and flink1.7.1

2019-01-26 Thread Vishal Santoshi
For 1. Thank you for pointing out that property. I surely overlooked it.
For 2. Will try out the other options.  It seems the suggestion that best
suits us ( we do not want to over engineer on the init container side

   - configure *metrics.internal.query-service.port* property to some fixed
   port (e.g. **)
   - modifying the docker entrypoint script to first configure
   *taskmanager.host*


I think this is what you seem to refer to as a possible solution ?

The headless service would generally imply a single service for each TM and
that is not sustainable..





On Sat, Jan 26, 2019 at 1:37 PM Nagarjun Guraja  wrote:

> For 1. you need to setup high-availability.jobmanager.port as a predefined
> port in your flink-conf.yaml and expose the port via job-manager-deployment
> and job-manager-service resources as well. That should do the trick.
>
> For 2. I am not sure of the timelines, but there are a few decent/not
> hacky workarounds to get around the problem, mentioned in the comments.
> Feel free to pick one to unblock yourselves.
>
> Regards,
> Nagarjun
>
> *Success is not final, failure is not fatal: it is the courage to continue
> that counts. *
> *- Winston Churchill - *
>
>
> On Sat, Jan 26, 2019 at 5:39 AM Vishal Santoshi 
> wrote:
>
>> There are we issues with 1.7.1 "job as a cluster" set up that I need
>> guidance on
>>
>> 1. In HA set up, the TMs are not able to resolve the job manager's random
>> port through the jobmanager.rpc.port
>> 
>> setting.  The setting does work in  the  non HA mode ( The containerPort
>> /TCP with the same port facilitates that ), but then we loose the  job if
>> the JM was to reboot. This is a high priority for us and I am sure there is
>> a work around but I rather ask the experts.
>>
>> 2. The metrics on JM are not visible possibly due to
>> https://issues.apache.org/jira/browse/FLINK-11127 . It is an open issue
>> and both a service per TM and stateful set approach appear non production
>> ready (not scalable and kludgey ). Do you have a time line when these will
>> be resolved.
>>
>> Thanks.
>>
>> Vishal
>>
>


Re: K8s and flink1.7.1

2019-01-26 Thread Nagarjun Guraja
For 1. you need to setup high-availability.jobmanager.port as a predefined
port in your flink-conf.yaml and expose the port via job-manager-deployment
and job-manager-service resources as well. That should do the trick.

For 2. I am not sure of the timelines, but there are a few decent/not hacky
workarounds to get around the problem, mentioned in the comments. Feel free
to pick one to unblock yourselves.

Regards,
Nagarjun

*Success is not final, failure is not fatal: it is the courage to continue
that counts. *
*- Winston Churchill - *


On Sat, Jan 26, 2019 at 5:39 AM Vishal Santoshi 
wrote:

> There are we issues with 1.7.1 "job as a cluster" set up that I need
> guidance on
>
> 1. In HA set up, the TMs are not able to resolve the job manager's random
> port through the jobmanager.rpc.port
> 
> setting.  The setting does work in  the  non HA mode ( The containerPort
> /TCP with the same port facilitates that ), but then we loose the  job if
> the JM was to reboot. This is a high priority for us and I am sure there is
> a work around but I rather ask the experts.
>
> 2. The metrics on JM are not visible possibly due to
> https://issues.apache.org/jira/browse/FLINK-11127 . It is an open issue
> and both a service per TM and stateful set approach appear non production
> ready (not scalable and kludgey ). Do you have a time line when these will
> be resolved.
>
> Thanks.
>
> Vishal
>


Re: StreamingFileSink Avro batch size and compression

2019-01-26 Thread Taher Koitawala
Thanks, I'll check it out.

On Sat 26 Jan, 2019, 10:48 PM Dawid Wysakowicz  Hi,
>
> Generally speaking you can pass the batch size through RollingPolicy[1].
> Unfortunately BulkFormats uses OnCheckpointRollingPolicy and AFAIK it does
> not allow adjusting its behavior on part size. Maybe Kostas have an idea
> how to do that in the least invasive way. How to do it for non bulk formats
> you can have a look at[2].
>
> I assume that you were using AvroParquetWriter. You can specify
> compression on the ParquetWriter I guess the same way as before. The code
> for doing it can look sth like this:
>
> StreamingFileSink.forBulkFormat(
> Path.fromLocalFile(folder),
> new ParquetWriterFactory<>(out ->
> AvroParquetWriter.builder(out)
> .withSchema(...)
> .withDataMode(...)
> .withCompressionCodec(...)
> .build()))
> .build()
>
> Best,
>
> Dawid
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.html
>
> [2]
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
> On 26/01/2019 08:18, Taher Koitawala wrote:
>
> Can someone please help with this?
>
> On Fri 25 Jan, 2019, 1:47 PM Taher Koitawala  wrote:
>
>> Hi All,
>>  Is there a way to specify *batch size* and *compression *properties
>> when using StreamingFileSink just like we did in bucketing sink? The only
>> parameters it is accepting is Inactivity bucket check interval and avro
>> schema.
>>
>>   We have numerous flink jobs pulling data from the same kafka
>> topics, however doing different operations. And each flink job is writing a
>> file with different size and we would want to make it consistent.
>>
>>
>> Regards,
>> Taher Koitawala
>> GS Lab Pune
>> +91 8407979163
>>
>


Re: StreamingFileSink Avro batch size and compression

2019-01-26 Thread Dawid Wysakowicz
Hi,

Generally speaking you can pass the batch size through RollingPolicy[1].
Unfortunately BulkFormats uses OnCheckpointRollingPolicy and AFAIK it
does not allow adjusting its behavior on part size. Maybe Kostas have an
idea how to do that in the least invasive way. How to do it for non bulk
formats you can have a look at[2].

I assume that you were using AvroParquetWriter. You can specify
compression on the ParquetWriter I guess the same way as before. The
code for doing it can look sth like this:

StreamingFileSink.forBulkFormat(
    Path.fromLocalFile(folder),
    new ParquetWriterFactory<>(out ->
        AvroParquetWriter.builder(out)
            .withSchema(...)
            .withDataMode(...)
            .withCompressionCodec(...)
            .build()))
    .build()

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.html

[2]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java

On 26/01/2019 08:18, Taher Koitawala wrote:
> Can someone please help with this?
>
> On Fri 25 Jan, 2019, 1:47 PM Taher Koitawala
> mailto:taher.koitaw...@gslab.com> wrote:
>
> Hi All,
>          Is there a way to specify /batch size/ and /compression
> /properties when using StreamingFileSink just like we did in
> bucketing sink? The only parameters it is accepting is Inactivity
> bucket check interval and avro schema.
>
>           We have numerous flink jobs pulling data from the same
> kafka topics, however doing different operations. And each flink
> job is writing a file with different size and we would want to
> make it consistent.
>
>
> Regards,
> Taher Koitawala
> GS Lab Pune
> +91 8407979163
>


signature.asc
Description: OpenPGP digital signature


Re: Best pattern for achieving stream enrichment (side-input) from a large static source

2019-01-26 Thread Ken Krugler
Hi Nimrod,

One approach is as follows…

1. For the Parquet data, you could use a ContinuousFileMonitoringFunction 

 to generate a stream of enrichment records.

2. Then use a CoMapFunction to “merge” the two connected streams (Kafka and 
Parquet), and output something like Tuple2>

3. In your enrichment function, based on what’s in the Either<> you’re either 
updating your enrichment state, or processing a record from Kafka.

But I think you might want to add a stateful function in the Parquet stream, so 
that you can separately track Kafka record state in the enrichment function.

There’s also the potential issue of wanting to buffer Kafka data in the 
enrichment function, if you need to coordinate with the enrichment data (e.g. 
you need to get a complete set of updated enrichment data before applying any 
of it to the incoming Kafka data).

— Ken



> On Jan 26, 2019, at 8:04 AM, Nimrod Hauser  
> wrote:
> 
> Hello,
> 
> We're using Flink on a high velocity data-stream, and we're looking for the 
> best way to enrich our stream using a large static source (originating from 
> Parquet files, which are rarely updated).
> 
> The source for the enrichment weights a few GBs, which is why we want to 
> avoid using techniques such as broadcast streams, which cannot be keyed and 
> need to be duplicated for every Flink operator that is used.
> 
> We started looking into the possibility of merging streams with datasets, or 
> using the Table API, but any best-practice that's been done before will be 
> greatly appreciated.
> 
> I'm attaching a simple chart for convenience,
> 
> Thanks you very much,
> 
> Nimrod.
> 

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: Adding flink udf support to linkedin's portable udf framework transport

2019-01-26 Thread Dawid Wysakowicz
Hi Arup,

Thanks for bringing our attention to that project. So far I am not aware
of anyone working on adding flink's udfs support to that project, but
that would be definitely worth investigating. Just as a thought, it
might make sense though to wait a little bit with that effort as the
type system for Flink's Table & SQL API will probably be updated in the
near future as part of the effort of restructuring Table API & SQL for
future contributions[1].

Best,

Dawid

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-32-Restructure-flink-table-for-future-contributions-td26543.html

On 24/01/2019 19:59, Arup Malakar wrote:
> Hi Flink Users,
>
> Came across the project transport from
> linkedin: https://github.com/linkedin/transport I think the project
> has great potential which allows for sharing udf implementation across
> various compute engines (hive/spark/presto). Any thoughts on adding
> support for flink udfs to transport or is anyone already working on it?
>
> -- 
> Arup Malakar


signature.asc
Description: OpenPGP digital signature


Re: getting duplicate messages from duplicate jobs

2019-01-26 Thread Dawid Wysakowicz
Forgot to cc Gordon :)

On 23/01/2019 18:02, Avi Levi wrote:
> Hi, 
> This quite confusing. 
> I submitted the same stateless job twice (actually I upload it once).
> However when I place a message on kafka, it seems that both jobs
> consumes it, and publish the same result (we publish the result to
> other kafka topic, so I actually see the massage duplicated on kafka
> ). how can it be ? both jobs are using the same group id (group id is
> fixed and not generated )
>
> Kind regards
> Avi



signature.asc
Description: OpenPGP digital signature


Re: getting duplicate messages from duplicate jobs

2019-01-26 Thread Dawid Wysakowicz
Hi Avi,

AFAIK Flink's Kafka consumer uses low level Kafka APIs and do not
participate in partition assignment protocol from Kafka, but it
discovers all available partitions for given topic and manages offsets
itself, what allows to provide exactly-once guarantees with regards to
Flink's internal state.

Flink's Kafka consumer uses the group.id to derive starting offsets for
partitions it can also commit back offsets to kafka for monitoring
purposes[1]. But as I said it does not participate in partition
assignment within a group, so it might happen that the same partition
will be read by multiple consumers with the same group.id.

I'm adding Gordon as a cc to correct me if I am wrong.

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration

On 23/01/2019 18:02, Avi Levi wrote:
> Hi, 
> This quite confusing. 
> I submitted the same stateless job twice (actually I upload it once).
> However when I place a message on kafka, it seems that both jobs
> consumes it, and publish the same result (we publish the result to
> other kafka topic, so I actually see the massage duplicated on kafka
> ). how can it be ? both jobs are using the same group id (group id is
> fixed and not generated )
>
> Kind regards
> Avi



signature.asc
Description: OpenPGP digital signature


K8s and flink1.7.1

2019-01-26 Thread Vishal Santoshi
There are we issues with 1.7.1 "job as a cluster" set up that I need
guidance on

1. In HA set up, the TMs are not able to resolve the job manager's random
port through the jobmanager.rpc.port

setting.  The setting does work in  the  non HA mode ( The containerPort
/TCP with the same port facilitates that ), but then we loose the  job if
the JM was to reboot. This is a high priority for us and I am sure there is
a work around but I rather ask the experts.

2. The metrics on JM are not visible possibly due to
https://issues.apache.org/jira/browse/FLINK-11127 . It is an open issue and
both a service per TM and stateful set approach appear non production ready
(not scalable and kludgey ). Do you have a time line when these will be
resolved.

Thanks.

Vishal


Re: Query on retract stream

2019-01-26 Thread Gagan Agrawal
Thanks Hequn for suggested solutions and I think this should really work
and will give it a try. As I understand First solution  of using multiple
windows will be good for those scenarios where I want output to be
generated post window is materialized (i.e. watermark reaches end of
window). And second will be good if I want it to be fired on per event
basis (i.e no watermarking). Apart from this, do you see any difference
from performance perspective in choosing between the two or both should be
equally performant?

Gagan

On Sat, Jan 26, 2019 at 11:50 AM Hequn Cheng  wrote:

> Hi Gagan,
>
> Time attribute fields will be materialized by the unbounded groupby. Also,
> currently, the window doesn't have the ability to handle retraction
> messages. I see two ways to solve the problem.
>
> - Use multi-window.  The first window performs lastValue, the second
> performs count.
> - Use two non-window aggregates. In this case, you don't have to change
> anything for the first aggregate. For the second one, you can group by an
> hour field and perform count(). The code looks like:
>
> SELECT userId,
>  count(orderId)
> FROM
> (SELECT orderId,
>  getHour(orderTime) as myHour,
>  lastValue(userId) AS userId,
>  lastValue(status) AS status
> FROM orders
> GROUP BY  orderId, orderTime)
> WHERE status='PENDING'
> GROUP BY myHour, userId
>
> Best,
> Hequn
>
>
>
>
> On Sat, Jan 26, 2019 at 12:29 PM Gagan Agrawal 
> wrote:
>
>> Based on the suggestions in this mail thread, I tried out few experiments
>> on upsert stream with flink 1.7.1 and here is the issue I am facing with
>> window stream.
>>
>> *1. Global Pending order count. *
>> Following query works fine and it's able to handle updates as per
>> original requirement.
>>
>> select userId, count(orderId) from
>> (select orderId, lastValue(userId) as userId, lastValue(status) as status
>> from orders group by orderId)
>> where status='PENDING' group by userId
>>
>> *2. Last 1 Hour tumbling window count (Append stream)*
>> Though following query doesn't handle upsert stream, I just tried to make
>> sure time column is working fine. This is working, but as expected, it
>> doesn't handle updates on orderId.
>>
>> select userId, count(orderId) from orders
>> where status='PENDING' group by TUMBLE(orderTime, INTERVAL '1' HOUR),
>> userId
>>
>> 3. *Last 1 Hour tumbling window count (With upsert stream)*
>> Now I tried combination of above two where input stream is converted to
>> upsert stream (via lastValue aggregate function) and then Pending count
>> needs to be calculated in last 1 hour window.
>>
>> select userId, count(orderId) from
>> (select orderId, orderTime, lastValue(userId) as userId,
>> lastValue(status) as status from orders group by orderId, orderTime)
>> where status='PENDING' group by TUMBLE(orderTime, INTERVAL '1' HOUR),
>> userId
>>
>> This one gives me following error. Is this because I have added orderTime
>> in group by/select clause and hence it's time characteristics have changed?
>> What is the workaround here as without adding orderTime, I can not perform
>> window aggregation on upsert stream.
>>
>> [error] Exception in thread "main"
>> org.apache.flink.table.api.ValidationException:* Window can only be
>> defined over a time attribute column.*
>> [error] at
>> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:84)
>> [error] at
>> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:89)
>> [error] at
>> org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:65)
>> [error] at
>> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:315)
>> [error] at
>> org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
>> [error] at
>> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415)
>> [error] at
>> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:252)
>> [error] at
>> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
>> [error] at
>> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:211)
>> [error] at
>> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:198)
>> [error] at
>> org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:360)
>> [error] at
>> org.apache.flink.table.api.TableEnvironment.runHepPlannerSequentially(TableEnvironment.scala:326)
>> [error] at
>> org.apache.flink.table.api.TableEnvironment.optimizeNormalizeLogicalPlan(TableEnvironment.scala:282)
>> [error] at
>> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:8