can flink sql handle udf-generated timestamp field

2019-06-04 Thread Yu Yang
Hi,

I am trying to use Flink SQL to do aggregation on a hopping window. In the
data stream, we store the timestamp in long type. So I wrote a UDF
'FROM_UNIXTIME' to convert long to Timestamp type.

  public static class TimestampModifier extends ScalarFunction {
public Timestamp eval(long t) {
  return new Timestamp(t);
}
public TypeInformation getResultType(Class[] signature) {
  return Types.SQL_TIMESTAMP;
}
  }

With the above UDF, I wrote the following query, and ran into
 "ProgramInvocationException: The main method caused an error: Window can
only be defined over a time attribute column".
Any suggestions on how to resolve this issue? I am using Flink 1.8 for this
experiment.

my sql query:

select  keyid, sum(value)
from (
   select FROM_UNIXTIME(timestampMs) as ts, key.id as keyid, value
   from orders)
 group by HOP(ts, INTERVAL ‘10’ second, INTERVAL ‘30’ second), keyid

flink exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Window can only be defined over a time attribute column.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.table.api.ValidationException: Window can only
be defined over a time attribute column.
at
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85)
at
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:99)
at
org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66)
at
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:559)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:418)

Regards,
-Yu


Re: count(DISTINCT) in flink SQL

2019-06-04 Thread Vinod Mehra
To be clear I want the outer grouping to have a longer retention time (of
the order of week or month - for which we are using 'idle state retention
time') and inner grouping to have a shorter retention period (1 hour max).
So hoping the session window will do the right thing.

Thanks,
Vinod

On Tue, Jun 4, 2019 at 5:14 PM Vinod Mehra  wrote:

> Thanks a lot Fabian for the detailed response. I know all the duplicates
> are going to arrive within an hour max of the actual event. So using a 1
> hour running session window should be fine for me.
>
> Is the following the right way to do it in apache-flink-1.4.2?
>
> SELECT
>
> CONCAT_WS(
>
>   '-',
>
>   CAST(event_month AS VARCHAR),
>
>   CAST(event_year AS VARCHAR),
>
>   CAST(user_id AS VARCHAR)
>
> ),
>
> COUNT(event_id) AS event_count
>
> FROM (
>
> SELECT
>
> user_id,
>
> event_id,
>
> MAX(MONTH(longToDateTime(rowtime))) as event_month,
>
> MAX(YEAR(longToDateTime(rowtime))) as event_year,
>
> FROM event_foo
>
> GROUP BY event_id, user_id, SESSION(rowtime, INTERVAL '1' HOUR) -- 1
> hour running session window
>
> )
>
> GROUP BY user_id, event_month, event_year
>
>
>
> We are also using idle state retention time to clean up unused state, but
> that is much longer (a week or month depending on the usecase). We will
> switch to count(DISTINCT) as soon as we move to newer Flink version. So the
> above nested select is going to be a stop gap until then.
>
> Thanks,
> Vinod
>
> On Mon, Jun 3, 2019 at 4:52 AM Fabian Hueske  wrote:
>
>> Hi Vinod,
>>
>> IIRC, support for DISTINCT aggregates was added in Flink 1.6.0 (released
>> August, 9th 2018) [1].
>> Also note that by default, this query will accumulate more and more
>> state, i.e., for each grouping key it will hold all unique event_ids.
>> You could configure an idle state retention time to clean up unused state.
>>
>> Regarding the boundaries, with the current query they are fixed to one
>> month and sharply cut (as one would expect).
>> You could try to use a long running session window [3]. This would also
>> remove the need for the idle state configuration because Flink would know
>> when state can be discarded.
>>
>> Hope this helps,
>> Fabian
>>
>> [1]
>> https://flink.apache.org/news/2018/08/09/release-1.6.0.html#enhancing-sql-and-table-api
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/query_configuration.html#idle-state-retention-time
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sql.html#group-windows
>>
>> Am Do., 30. Mai 2019 um 02:18 Uhr schrieb Vinod Mehra :
>>
>>> Another interesting thing is that if I add DISTINCT in the 2nd query it
>>> doesn't complain. But because of the inner-select it is a no-op because the
>>> inner select is doing the deduping:
>>>
>>> SELECT
>>>
>>> CONCAT_WS(
>>>
>>>   '-',
>>>
>>>   CAST(MONTH(row_datetime) AS VARCHAR),
>>>
>>>   CAST(YEAR(row_datetime) AS VARCHAR),
>>>
>>>   CAST(user_id AS VARCHAR)
>>>
>>> ),
>>>
>>> COUNT(*DISTINCT*(event_id)) AS event_count -- note the DISTINCT
>>> keyword here. Flink doesn't barf for this.
>>>
>>> FROM (
>>>
>>> SELECT
>>>
>>> user_id,
>>>
>>> event_id,
>>>
>>> maxtimestamp(longToDateTime(rowtime)) as row_datetime
>>>
>>> FROM event_foo
>>>
>>> GROUP BY event_id, user_id
>>>
>>> )
>>>
>>> GROUP BY user_id, MONTH(row_datetime), YEAR(row_datetime)
>>>
>>> On Wed, May 29, 2019 at 5:15 PM Vinod Mehra  wrote:
>>>
 More details on the error with query#1 that used COUNT(DISTINCT()):

 org.apache.flink.table.api.TableException: Cannot generate a valid
 execution plan for the given query:

 FlinkLogicalCalc(expr#0..8=[{inputs}], expr#9=[_UTF-16LE'-'],
 expr#10=[CAST($t1):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE
 "ISO-8859-1$en_US$primary"], expr#11=[CAST($t2):VARCHAR(65536) CHARACTER
 SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary"],
 expr#12=[CAST($t0):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE
 "ISO-8859-1$en_US$primary"], expr#13=[CONCAT_WS($t9, $t10, $t11, $t12)],
 EXPR$0=[$t13], mastercard_world_elite_monthly_rides_encoded=[$t8],
 lower_boundary=[$t3], latency_marker=[$t4])
   FlinkLogicalJoin(condition=[AND(IS NOT DISTINCT FROM($0, $5), IS NOT
 DISTINCT FROM($1, $6), IS NOT DISTINCT FROM($2, $7))], joinType=[inner])
 FlinkLogicalAggregate(group=[{0, 1, 2}],
 lower_boundary=[mintimestamp($4)], latency_marker=[maxtimestamp($4)])
   FlinkLogicalCalc(expr#0..6=[{inputs}], expr#7=[FLAG(MONTH)],
 expr#8=[CAST($t6):TIMESTAMP(3) NOT NULL], expr#9=[longToDateTime($t8)],
 expr#10=[Reinterpret($t9)], expr#11=[8640], expr#12=[/INT($t10, $t11)],
 expr#13=[EXTRACT_DATE($t7, $t12)], expr#14=[FLAG(YEAR)],
 expr#15=[EXTRACT_DATE($t14, $t12)], expr#16=[_UTF-16LE'USD'],
 expr#17=[=($t4, $t16)], expr#18=[_UTF-16LE'MWE'], expr#

Re: error in flink

2019-06-04 Thread Yang Wang
Hi, yuvraj singh

The possible reason may be that you have reached the linux system limit of
max user processes. You could confirm this by using the "ulimit -a" command.
Also "ulimit -u 32000" could be used to override the default value.
Please make sure the user you are running the above commands is same as the
taskmanager container.

yuvraj singh <19yuvrajsing...@gmail.com> 于2019年6月4日周二 下午6:52写道:

>
> Hi all ,
>
> i am having on problem , i was running a job then i submitted one more job
> on the same cluster then my old job start failing by saying
>
> 2019-06-04 15:12:11,593 ERROR org.apache.flink.yarn.YarnResourceManager - 
> Could not start TaskManager in container.
> java.lang.OutOfMemoryError: unable to create new native thread
>
> please help me
>
> Thanks
>
>
>
>
>
>
> [image: Mailtrack]
> 
>  Sender
> notified by
> Mailtrack
> 
>  06/04/19,
> 4:22:13 PM
>


Re: count(DISTINCT) in flink SQL

2019-06-04 Thread Vinod Mehra
Thanks a lot Fabian for the detailed response. I know all the duplicates
are going to arrive within an hour max of the actual event. So using a 1
hour running session window should be fine for me.

Is the following the right way to do it in apache-flink-1.4.2?

SELECT

CONCAT_WS(

  '-',

  CAST(event_month AS VARCHAR),

  CAST(event_year AS VARCHAR),

  CAST(user_id AS VARCHAR)

),

COUNT(event_id) AS event_count

FROM (

SELECT

user_id,

event_id,

MAX(MONTH(longToDateTime(rowtime))) as event_month,

MAX(YEAR(longToDateTime(rowtime))) as event_year,

FROM event_foo

GROUP BY event_id, user_id, SESSION(rowtime, INTERVAL '1' HOUR) -- 1
hour running session window

)

GROUP BY user_id, event_month, event_year



We are also using idle state retention time to clean up unused state, but
that is much longer (a week or month depending on the usecase). We will
switch to count(DISTINCT) as soon as we move to newer Flink version. So the
above nested select is going to be a stop gap until then.

Thanks,
Vinod

On Mon, Jun 3, 2019 at 4:52 AM Fabian Hueske  wrote:

> Hi Vinod,
>
> IIRC, support for DISTINCT aggregates was added in Flink 1.6.0 (released
> August, 9th 2018) [1].
> Also note that by default, this query will accumulate more and more state,
> i.e., for each grouping key it will hold all unique event_ids.
> You could configure an idle state retention time to clean up unused state.
>
> Regarding the boundaries, with the current query they are fixed to one
> month and sharply cut (as one would expect).
> You could try to use a long running session window [3]. This would also
> remove the need for the idle state configuration because Flink would know
> when state can be discarded.
>
> Hope this helps,
> Fabian
>
> [1]
> https://flink.apache.org/news/2018/08/09/release-1.6.0.html#enhancing-sql-and-table-api
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/query_configuration.html#idle-state-retention-time
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sql.html#group-windows
>
> Am Do., 30. Mai 2019 um 02:18 Uhr schrieb Vinod Mehra :
>
>> Another interesting thing is that if I add DISTINCT in the 2nd query it
>> doesn't complain. But because of the inner-select it is a no-op because the
>> inner select is doing the deduping:
>>
>> SELECT
>>
>> CONCAT_WS(
>>
>>   '-',
>>
>>   CAST(MONTH(row_datetime) AS VARCHAR),
>>
>>   CAST(YEAR(row_datetime) AS VARCHAR),
>>
>>   CAST(user_id AS VARCHAR)
>>
>> ),
>>
>> COUNT(*DISTINCT*(event_id)) AS event_count -- note the DISTINCT
>> keyword here. Flink doesn't barf for this.
>>
>> FROM (
>>
>> SELECT
>>
>> user_id,
>>
>> event_id,
>>
>> maxtimestamp(longToDateTime(rowtime)) as row_datetime
>>
>> FROM event_foo
>>
>> GROUP BY event_id, user_id
>>
>> )
>>
>> GROUP BY user_id, MONTH(row_datetime), YEAR(row_datetime)
>>
>> On Wed, May 29, 2019 at 5:15 PM Vinod Mehra  wrote:
>>
>>> More details on the error with query#1 that used COUNT(DISTINCT()):
>>>
>>> org.apache.flink.table.api.TableException: Cannot generate a valid
>>> execution plan for the given query:
>>>
>>> FlinkLogicalCalc(expr#0..8=[{inputs}], expr#9=[_UTF-16LE'-'],
>>> expr#10=[CAST($t1):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE
>>> "ISO-8859-1$en_US$primary"], expr#11=[CAST($t2):VARCHAR(65536) CHARACTER
>>> SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary"],
>>> expr#12=[CAST($t0):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE
>>> "ISO-8859-1$en_US$primary"], expr#13=[CONCAT_WS($t9, $t10, $t11, $t12)],
>>> EXPR$0=[$t13], mastercard_world_elite_monthly_rides_encoded=[$t8],
>>> lower_boundary=[$t3], latency_marker=[$t4])
>>>   FlinkLogicalJoin(condition=[AND(IS NOT DISTINCT FROM($0, $5), IS NOT
>>> DISTINCT FROM($1, $6), IS NOT DISTINCT FROM($2, $7))], joinType=[inner])
>>> FlinkLogicalAggregate(group=[{0, 1, 2}],
>>> lower_boundary=[mintimestamp($4)], latency_marker=[maxtimestamp($4)])
>>>   FlinkLogicalCalc(expr#0..6=[{inputs}], expr#7=[FLAG(MONTH)],
>>> expr#8=[CAST($t6):TIMESTAMP(3) NOT NULL], expr#9=[longToDateTime($t8)],
>>> expr#10=[Reinterpret($t9)], expr#11=[8640], expr#12=[/INT($t10, $t11)],
>>> expr#13=[EXTRACT_DATE($t7, $t12)], expr#14=[FLAG(YEAR)],
>>> expr#15=[EXTRACT_DATE($t14, $t12)], expr#16=[_UTF-16LE'USD'],
>>> expr#17=[=($t4, $t16)], expr#18=[_UTF-16LE'MWE'], expr#19=[=($t2, $t18)],
>>> expr#20=[NOT($t5)], expr#21=[AND($t17, $t19, $t20)], user_lyft_id=[$t3],
>>> $f1=[$t13], $f2=[$t15], ride_id=[$t1], $f4=[$t9], $condition=[$t21])
>>>
>>> FlinkLogicalNativeTableScan(table=[[event_pay_passenger_payment_succeeded_for_rewards]])
>>> FlinkLogicalAggregate(group=[{0, 1, 2}],
>>> mastercard_world_elite_monthly_rides_encoded=[COUNT($3)])
>>>   FlinkLogicalAggregate(group=[{0, 1, 2, 3}])
>>> FlinkLogicalCalc(expr#0..6=[{inputs}], expr#7=[FLAG(MONTH)],
>>> expr#8=[CAST

Re: Flink 1.8

2019-06-04 Thread Vishal Santoshi
Based on where this line of  code is, it is hard to get the full stack
trace, as in the

LOG.error("Cannot update accumulators for job {}.", getJobID(), e);

does not get us the full stack trace

Though it is true that the Accumulator did not have a serialVersionUID.  I
would double check. I though would think this should not be a recoverable
error  ?





On Tue, Jun 4, 2019 at 4:32 PM Timothy Victor  wrote:

> It's hard to tell without more info.
>
> From the method that threw the exception it looked like it was trying to
> deserialize the accumulator.   By any chance did you change your
> accumulator class but forgot to update the serialVersionUID?  Just
> wondering if it is trying to deserialize to a different class definition.
>
> A more detailed stscktrace (maybe with debug on) will help.
>
> Tim
>
> On Tue, Jun 4, 2019, 8:41 PM Vishal Santoshi 
> wrote:
>
>> I see tons of
>>
>> org.apache.flink.runtime.executiongraph.ExecutionGraph- Cannot
>> update accumulators for job 7bfe57bb0ed1c5c2f4f40c2fccaab50d.
>>
>> java.lang.NullPointerException
>>
>>
>>
>>
>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java#L1715
>>
>> Not sure why it is tagged ERROR and the pipe does not abort or what this
>> implies.
>>
>


Re: Flink 1.8

2019-06-04 Thread Timothy Victor
It's hard to tell without more info.

>From the method that threw the exception it looked like it was trying to
deserialize the accumulator.   By any chance did you change your
accumulator class but forgot to update the serialVersionUID?  Just
wondering if it is trying to deserialize to a different class definition.

A more detailed stscktrace (maybe with debug on) will help.

Tim

On Tue, Jun 4, 2019, 8:41 PM Vishal Santoshi 
wrote:

> I see tons of
>
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Cannot
> update accumulators for job 7bfe57bb0ed1c5c2f4f40c2fccaab50d.
>
> java.lang.NullPointerException
>
>
>
>
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java#L1715
>
> Not sure why it is tagged ERROR and the pipe does not abort or what this
> implies.
>


Re: Job recovers from an old dangling CheckPoint in case of Job Cluster based Flink pipeline

2019-06-04 Thread Vishal Santoshi
The above is flink 1.8

On Tue, Jun 4, 2019 at 12:32 PM Vishal Santoshi 
wrote:

> I had a sequence of events that created this issue.
>
> * I started a job and I had the state.checkpoints.num-retained: 5
>
> * As expected I have 5 latest checkpoints retained in my hdfs backend.
>
>
> * JM dies ( K8s limit etc ) without cleaning the hdfs directory.  The k8s
> job restores from the latest checkpoint ( I think ) but as it creates new
> checkpoints it does not delete the older chk point. At the end there are
> now 10 chkpoints,  5 from the old run which remain static and 5 latest
> representing the on going pipe.
>
> * The JM dies again and restart  from the latest from the 5 old
> checkpoints.
>
> This looks a bug in the Job Cluster implementation of flink. It looks like
> it is taking the 5th checkpoint from the beginning based on num-retained
> value, Note that it has the same job id and does not scope to a new
> directory.
>
>
> https://github.com/apache/flink/blob/1dfdaa417ab7cdca9bef1efe6381c7eb67022aaf/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L109
>
> Please tell me if this does not make sense.
>
> Vishal
>
>
>
>
>
>


Flink 1.8

2019-06-04 Thread Vishal Santoshi
I see tons of

org.apache.flink.runtime.executiongraph.ExecutionGraph- Cannot
update accumulators for job 7bfe57bb0ed1c5c2f4f40c2fccaab50d.

java.lang.NullPointerException




https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java#L1715

Not sure why it is tagged ERROR and the pipe does not abort or what this
implies.


Does Flink Kafka connector has max_pending_offsets concept?

2019-06-04 Thread wang xuchen
Hi Flink users,

When # of Kafka consumers  = # of partitions, and I use setParallelism(>1),
something like this

'messageSteam.rebalance().map(lamba).setParallelism(3).print()'

How do I tune # of outstanding uncommitted offset? Something similar to

https://storm.apache.org/releases/1.1.2/storm-kafka-client.html in Storm.

Thanks
Ben


Re: Clean way of expressing UNNEST operations

2019-06-04 Thread Piyush Narang
Hi Jingsong,

Thanks for getting back. I’ll try and hook up the UDTF.

I added a unit test which catches the issue I’m running into (I tested this 
against Flink 1.6 which is what we’re running as well as latest master). Did 
you have to do anything in particular to hook up the type correctly?

Error I get is: “Caused by: org.apache.calcite.runtime.CalciteContextException: 
From line 1, column 114 to line 1, column 120: List of column aliases must have 
same degree as table; table has 3 columns ('price', 'quantity', 'externalId'), 
whereas alias list has 1 columns”

@Test
def testArrayOfRow(): Unit = {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val tEnv = StreamTableEnvironment.create(env)
  StreamITCase.clear

  class Event(t0: Int, t1: Array[Row]) extends 
org.apache.flink.api.java.tuple.Tuple2[Int, Array[Row]](t0, t1)
  def row(values: Any*): Row = Row.of(values.map(_.asInstanceOf[AnyRef]):_*)

  val rowType = Types.ROW(fieldNames = Array("price", "quantity", 
"externalId"), Array(Types.DOUBLE, Types.INT, Types.INT))
  implicit val typeInfo = new TupleTypeInfo[Event](Types.INT, 
Types.OBJECT_ARRAY(rowType))

  val myArr1 = Array[Row](row(12.45, 10, 1))
  val myArr2 = Array[Row](row(10.0, 1, 1), row(20.0, 1, 1))
  val myArr3 = Array[Row](row(12.45, 10, 1))

  val input = env.fromElements[Event](
new Event(123, myArr1),
new Event(123, myArr2),
new Event(456, myArr3)
  )

  tEnv.registerDataStream[Event]("advertiser_event", input, 'partnerId, 
'products)

  val table = tEnv.sqlQuery("SELECT partnerId, product.price, product.quantity 
FROM advertiser_event, UNNEST(advertiser_event.products) AS t (product) ")
  table.toAppendStream[Row](table.getSchema.toRowType).print()
}

When I list out all three fields instead of t(product), I don’t face the issue..

Thanks,

-- Piyush


From: JingsongLee 
Reply-To: JingsongLee 
Date: Tuesday, June 4, 2019 at 2:42 AM
To: JingsongLee , Piyush Narang , 
"user@flink.apache.org" 
Subject: Re: Clean way of expressing UNNEST operations

Hi @Piyush Narang

I tried again, if the type of advertiser_event.products is derived correctly. 
(ObjectTypeInfo(RowTypeInfo(fields...)))
It will work. See more information in calcite code: 
SqlUnnestOperator.inferReturnType
So I think maybe your type is not passed to the engine correctly.

Best, JingsongLee

--
From:JingsongLee 
Send Time:2019年6月4日(星期二) 13:35
To:Piyush Narang ; user@flink.apache.org 

Subject:Re: Clean way of expressing UNNEST operations

Hi @Piyush Narang
It seems that Calcite's type inference is not perfect, and the fields of return 
type can not be inferred in UNNEST. (Errors were reported during the Calcite 
Validate phase.)

But UDTF supports this usage, and if it's convenient, you might consider 
writing a UDTF with similar UNNEST functions to try it out. (Use JOIN LATERAL 
TABLE)

Best, JingsongLee

--
From:Piyush Narang 
Send Time:2019年6月4日(星期二) 00:20
To:user@flink.apache.org 
Subject:Clean way of expressing UNNEST operations

Hi folks,

I’m using the SQL API and trying to figure out the best way to unnest and 
operate on some data.
My data is structured as follows:
Table:
Advertiser_event:

  *   Partnered: Int
  *   Products: Array< Row< price: Double, quantity: Int, … > >
  *   …

I’m trying to unnest the products array and then compute something on a couple 
of fields in the product row (e.g. price * quantity)

My query looks like this:
SELECT partnerId, price, quantity FROM advertiser_event, 
UNNEST(advertiser_event.products) AS t (price, quantity, field3, field4, …)

My issue / problem is that, when I try to unnest this array I need to 
specify all the fields in the temp table as part of the unnest (“t” above). If 
I don’t, I get an error saying the number of fields doesn’t match what is 
expected. This makes my query a bit fragile in case additional fields are added 
/ removed from this product structure.

Does anyone know if there’s a way around this? As a contrast on an engine like 
Presto, the unnest operation would yield a ‘product’ row type which I can then 
use to pick the fields I want “product.price”, “product.quantity”.
Presto query:
SELECT partnerId, product.price, product.quantity FROM advertiser_event CROSS 
JOIN UNNEST(products) AS product

Thanks,

-- Piyush



Job recovers from an old dangling CheckPoint in case of Job Cluster based Flink pipeline

2019-06-04 Thread Vishal Santoshi
I had a sequence of events that created this issue.

* I started a job and I had the state.checkpoints.num-retained: 5

* As expected I have 5 latest checkpoints retained in my hdfs backend.


* JM dies ( K8s limit etc ) without cleaning the hdfs directory.  The k8s
job restores from the latest checkpoint ( I think ) but as it creates new
checkpoints it does not delete the older chk point. At the end there are
now 10 chkpoints,  5 from the old run which remain static and 5 latest
representing the on going pipe.

* The JM dies again and restart  from the latest from the 5 old checkpoints.

This looks a bug in the Job Cluster implementation of flink. It looks like
it is taking the 5th checkpoint from the beginning based on num-retained
value, Note that it has the same job id and does not scope to a new
directory.

https://github.com/apache/flink/blob/1dfdaa417ab7cdca9bef1efe6381c7eb67022aaf/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L109

Please tell me if this does not make sense.

Vishal


Re: Flink job server with HA

2019-06-04 Thread Boris Lublinsky
And it works now.
My mistake

Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com
https://www.lightbend.com/

> On Jun 3, 2019, at 10:18 PM, Xintong Song  wrote:
> 
> If that is the case, then I would suggest you to check the following two 
> things:
> 1. Is the HA mode configured properly in Flink configuration? There should be 
> a config option "high-availability" in your flink-conf.yarml. If not 
> configured, the default value would be "NONE".
> 2. It "ClassPathJobGraphRetriever#retrieveJobGraph" actually invoked, and is 
> there any exceptions thrown from it. This is to verify whether the correct 
> code path for job cluster is invoked.
> 
> Thank you~
> Xintong Song
> 
> 
> On Tue, Jun 4, 2019 at 10:48 AM Boris Lublinsky 
> mailto:boris.lublin...@lightbend.com>> wrote:
> I am running on k8
> Job master runs as a deployment of 1, so just killing a pod restarts it
> 
> Boris Lublinsky
> FDP Architect
> boris.lublin...@lightbend.com 
> https://www.lightbend.com/ 
>> On Jun 3, 2019, at 9:46 PM, Xintong Song > > wrote:
>> 
>> So here are my questions:
>> 1. What environment do you run Flink in? Is it locally, on Yarn or Mesos?
>> 2. How do you trigger "restart a Job Master"?
>> 
>> Thank you~
>> Xintong Song
>> 
>> 
>> On Tue, Jun 4, 2019 at 10:35 AM Boris Lublinsky 
>> mailto:boris.lublin...@lightbend.com>> wrote:
>> Thanks,
>> Thats what I thought initially.
>> The issue is that because of this, during restart, it does not know which 
>> job was running before (it is obtained from submitted job graph store).
>> Because this is empty, there is no restarted jobs and the cluster does not 
>> even try to restore checkpoints.
>> I can see that checkpoints are stored correctly, but they are never accessed.
>> 
>> Boris Lublinsky
>> FDP Architect
>> boris.lublin...@lightbend.com 
>> https://www.lightbend.com/ 
>>> On Jun 3, 2019, at 9:23 PM, Xintong Song >> > wrote:
>>> 
>>> Hi Boris,
>>> 
>>> I think what you described that putJobGraph is not invoked in Flink job 
>>> cluster is by design and should not cause a failure of job recovering. For 
>>> a Flink job cluster, there is only one job graph to execute. Instead of 
>>> uploading job graph to an already running cluster (like in a session 
>>> cluster), the job graph in a Flink job cluster is uploaded before the 
>>> cluster is started, together with the Flink framework jars. Please refer to 
>>> MiniDispatcher and SingleJobSubmittedJobGraphStore for the details.
>>> 
>>> I think we need more information to find the root cause of your problem. 
>>> For example, can you explain what are the detailed operation steps do you 
>>> perform when you say "trying to restart a Job Master".
>>> 
>>> Thank you~
>>> Xintong Song
>>> 
>>> 
>>> On Mon, Jun 3, 2019 at 10:05 PM Boris Lublinsky 
>>> mailto:boris.lublin...@lightbend.com>> 
>>> wrote:
>>> I am trying to experiment with Flink Job server with HA and I am noticing, 
>>> that in this case
>>> method putJobGraph in the class SubmittedJobGraphStore Is never invoked. (I 
>>> can see that it is invoked in the case of session cluster when a job is 
>>> added)
>>> As a result, when I am trying to restart a Job Master, it finds no running 
>>> jobs and is not trying to restore it.
>>> Am I missing something?
>>> 
>>>  
>>> 
>>> Boris Lublinsky
>>> FDP Architect
>>> boris.lublin...@lightbend.com 
>>> https://www.lightbend.com/ 
>> 
> 



Re: What does flink session mean ?

2019-06-04 Thread Till Rohrmann
Yes, interactive programming solves the problem by storing the meta
information on the client whereas in the past we thought whether to keep
the information on the JM. But this would then not allow to share results
between different clusters. Thus, the interactive programming approach is a
bit more general, I think.

Cheers,
Till

On Tue, Jun 4, 2019 at 11:13 AM Jeff Zhang  wrote:

> Thanks for the reply, @Till Rohrmann .  Regarding
> reuse computed results. I think JM keep all the metadata of intermediate
> data, and interactive programming is also trying to reuse computed results.
> It looks like it may not be necessary to introduce the session concept as
> long as we can achieve reusing computed results. Let me if I understand it
> correctly.
>
>
>
> Till Rohrmann  于2019年6月4日周二 下午4:03写道:
>
>> Hi Jeff,
>>
>> the session functionality which you find in Flink's client are the
>> remnants of an uncompleted feature which was abandoned. The idea was that
>> one could submit multiple parts of a job to the same cluster where these
>> parts are added to the same ExecutionGraph. That way we wanted to allow to
>> reuse computed results when using a notebook for ad-hoc queries, for
>> example. But as I said, this feature has never been completed.
>>
>> Cheers,
>> Till
>>
>> On Sun, Jun 2, 2019 at 3:20 PM Jeff Zhang  wrote:
>>
>>>
>>> Hi Folks,
>>>
>>>
>>> When I read the flink client api code, the concept of session is a
>>> little vague and unclear to me. It looks like the session concept is only
>>> applied in batch mode (I only see it in ExecutionEnvironment but not in
>>> StreamExecutionEnvironment). But for local mode
>>> (LocalExecutionEnvironment), starting one new session is starting one new
>>> MiniCluster, but in remote mode (RemoteExecutionEnvironment), starting one
>>> new session is just starting one new ClusterClient instead of one new
>>> cluster. So I am confused what does flink session really mean. Could anyone
>>> help me understand this ? Thanks.
>>>
>>>
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: BigQuery source ?

2019-06-04 Thread Richard Deurwaarder
I've looked into this briefly a while ago out of interest and read about
how beam handles this. I've never actually implemented but the concept
sounds reasonable to me.

What I read from their code is that beam exports the BigQuery data to
Google Storage. This export shards the data in files with a max size of 1GB
and these files are then processed by the 'source functions' in beam.

I think implementing this in Flink would require the following:

* Before starting the Flink job run the BigQuery to Google Storage Export (
https://cloud.google.com/bigquery/docs/exporting-data)
* Start the flink job and point towards the Google storage files (using
https://cloud.google.com/dataproc/docs/concepts/connectors/cloud-storage to
easily read from Google Storage buckets)

So the job might look something like this:

> List files = doBigQueryExportJob();
> DataSet records = environment.fromCollection(files)
> .flatMap(new ReadFromFile())
> .map(doWork());


On Fri, May 31, 2019 at 10:15 AM Niels Basjes  wrote:

> Hi,
>
> Has anyone created a source to READ from BigQuery into Flink yet (we
> have Flink running on K8S in the Google cloud)?
> I would like to retrieve a DataSet in a distributed way (the data ...
> it's kinda big) and process that with Flink running on k8s (which we
> have running already).
>
> So far I have not been able to find anything yet.
> Any pointers/hints/code fragments are welcome.
>
> Thanks
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>


Re: Read file from S3 and write to kafka

2019-06-04 Thread miki haiat
You can use the DataSet API to parse files from S3.

https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/batch/#data-sources
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html#s3-simple-storage-service


And then  parsed it and send it to kafka.



On Tue, Jun 4, 2019 at 5:57 AM anurag  wrote:

> Hi All,
> I am searched a lot on google but could not find how I can achieve writing
> a flink  function which reads a file in S3 and for each line in the file
> write a message to kafka.
> Thanks a lot , much appreciated. I am sorry if I did not searched properly.
> Thanks,
> Anurag
>


error in flink

2019-06-04 Thread yuvraj singh
Hi all ,

i am having on problem , i was running a job then i submitted one more job
on the same cluster then my old job start failing by saying

2019-06-04 15:12:11,593 ERROR
org.apache.flink.yarn.YarnResourceManager - Could not start
TaskManager in container.
java.lang.OutOfMemoryError: unable to create new native thread

please help me

Thanks






[image: Mailtrack]

Sender
notified by
Mailtrack

06/04/19,
4:22:13 PM


Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

2019-06-04 Thread zhijiang
The jira is https://issues.apache.org/jira/browse/FLINK-12544 and you could 
find the PR link in it.
--
From:Erai, Rahul 
Send Time:2019年6月4日(星期二) 18:19
To:zhijiang ; Aljoscha Krettek 
; Piotr Nowojski ; 
"Narayanaswamy, Krishna" 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" 
Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks


Thanks Zhijiang.
Can you point us to the JIRA for your fix?

Regards,
-Rahul

From: zhijiang  
Sent: Tuesday, June 4, 2019 1:26 PM
To: Aljoscha Krettek ; Piotr Nowojski 
; Narayanaswamy, Krishna [Tech] 
; Erai, Rahul [Tech] 

Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] 
Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
Yes, it is the same case as multiple slots in TM. The source task and co-group 
task are still in the same TM in this case. I think you might enable slot 
sharing, so they are running still in the same slot in one TM.
BTW, the previous deadlock issue is already fixed on my side, and it is waiting 
for review atm. You could pick the code in PR to verfiy the results if you 
like. And the next release-1.8.1 might cover this fix as well.
Best,
Zhijiang
--
From:Erai, Rahul 
Send Time:2019年6月4日(星期二) 15:50
To:zhijiang ; Aljoscha Krettek 
; Piotr Nowojski ; 
"Narayanaswamy, Krishna" 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" 
Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
Hello Zhijiang,
We have been seeing deadlocks with single slot TMs as well. Attaching the 
thread dump as requested. Looks similar to what was had with multi-slots TMs.
Thanks,
Rahul

From: zhijiang  
Sent: Wednesday, May 22, 2019 7:56 AM
To: Aljoscha Krettek ; Piotr Nowojski 
; Narayanaswamy, Krishna [Tech] 

Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
If it is still the case of multiple slots in one TaskManager, it is the same as 
before. But you said you already used the single slot per TaskManager, right?
If it is the case of single slot in TaskManager, you could attach the jstack 
when occurs next time, otherwise it is not needed.
Best,
Zhijiang
--
From:Narayanaswamy, Krishna 
Send Time:2019年5月22日(星期三) 00:49
To:zhijiang ; Aljoscha Krettek 
; Piotr Nowojski 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" ; "Erai, Rahul" 

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
Hi Zhijiang,

I couldn’t get the jstack due to some constraints this time around. Will try 
and get them when it occurs next. But from the looks of it from the 
console/logs it appears to be the same as the 2 slot cases. DataSource 
finishing up and CoGroup looking to move from DEPLOYING to RUNNING (and stuck 
at DEPLOYING)

Thanks,
Krishna.

From: zhijiang  
Sent: Tuesday, May 21, 2019 7:38 PM
To: Aljoscha Krettek ; Piotr Nowojski 
; Narayanaswamy, Krishna [Tech] 

Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
Hi  Krishna,
Could you show me or attach the jstack for the single slot case? Or is it the 
same jstack as before?
Best,
Zhijiang
--
From:Narayanaswamy, Krishna 
Send Time:2019年5月21日(星期二) 19:50
To:zhijiang ; Aljoscha Krettek 
; Piotr Nowojski 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" ; "Erai, Rahul" 

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks

We started to run jobs using the single slotted task managers which seemed to 
be ok for the past couple of days, but today morning we seem to be seeing these 
deadlocks even with 1 slot. Is there something else we could try out?

Thanks,
Krishna.

From: Narayanaswamy, Krishna [Tech] 
Sent: Friday, May 17, 2019 4:20 PM
To: 'zhijiang' ; Aljoscha Krettek 
; Piotr Nowojski 
Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
Thanks Zhijiang. 

We will try these deadlock usecases with a single slot approach to see how they 
go. Will await the fix to start using more slots on the single TM.

Thanks,
Krishna.

From: zhijiang  
Sent: Friday, May 17, 2019 4:05 PM
To: Aljoscha Krettek ; Piotr Nowojski 
; Narayanaswamy, Krishna [Tech] 

Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 

Re: What does flink session mean ?

2019-06-04 Thread Jeff Zhang
Thanks for the reply, @Till Rohrmann .  Regarding
reuse computed results. I think JM keep all the metadata of intermediate
data, and interactive programming is also trying to reuse computed results.
It looks like it may not be necessary to introduce the session concept as
long as we can achieve reusing computed results. Let me if I understand it
correctly.



Till Rohrmann  于2019年6月4日周二 下午4:03写道:

> Hi Jeff,
>
> the session functionality which you find in Flink's client are the
> remnants of an uncompleted feature which was abandoned. The idea was that
> one could submit multiple parts of a job to the same cluster where these
> parts are added to the same ExecutionGraph. That way we wanted to allow to
> reuse computed results when using a notebook for ad-hoc queries, for
> example. But as I said, this feature has never been completed.
>
> Cheers,
> Till
>
> On Sun, Jun 2, 2019 at 3:20 PM Jeff Zhang  wrote:
>
>>
>> Hi Folks,
>>
>>
>> When I read the flink client api code, the concept of session is a little
>> vague and unclear to me. It looks like the session concept is only applied
>> in batch mode (I only see it in ExecutionEnvironment but not in
>> StreamExecutionEnvironment). But for local mode
>> (LocalExecutionEnvironment), starting one new session is starting one new
>> MiniCluster, but in remote mode (RemoteExecutionEnvironment), starting one
>> new session is just starting one new ClusterClient instead of one new
>> cluster. So I am confused what does flink session really mean. Could anyone
>> help me understand this ? Thanks.
>>
>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>

-- 
Best Regards

Jeff Zhang


Re: What does flink session mean ?

2019-06-04 Thread Till Rohrmann
Hi Jeff,

the session functionality which you find in Flink's client are the remnants
of an uncompleted feature which was abandoned. The idea was that one could
submit multiple parts of a job to the same cluster where these parts are
added to the same ExecutionGraph. That way we wanted to allow to reuse
computed results when using a notebook for ad-hoc queries, for example. But
as I said, this feature has never been completed.

Cheers,
Till

On Sun, Jun 2, 2019 at 3:20 PM Jeff Zhang  wrote:

>
> Hi Folks,
>
>
> When I read the flink client api code, the concept of session is a little
> vague and unclear to me. It looks like the session concept is only applied
> in batch mode (I only see it in ExecutionEnvironment but not in
> StreamExecutionEnvironment). But for local mode
> (LocalExecutionEnvironment), starting one new session is starting one new
> MiniCluster, but in remote mode (RemoteExecutionEnvironment), starting one
> new session is just starting one new ClusterClient instead of one new
> cluster. So I am confused what does flink session really mean. Could anyone
> help me understand this ? Thanks.
>
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

2019-06-04 Thread zhijiang
Yes, it is the same case as multiple slots in TM. The source task and co-group 
task are still in the same TM in this case. I think you might enable slot 
sharing, so they are running still in the same slot in one TM.
BTW, the previous deadlock issue is already fixed on my side, and it is waiting 
for review atm. You could pick the code in PR to verfiy the results if you 
like. And the next release-1.8.1 might cover this fix as well.

Best,
Zhijiang
--
From:Erai, Rahul 
Send Time:2019年6月4日(星期二) 15:50
To:zhijiang ; Aljoscha Krettek 
; Piotr Nowojski ; 
"Narayanaswamy, Krishna" 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" 
Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks


Hello Zhijiang,
We have been seeing deadlocks with single slot TMs as well. Attaching the 
thread dump as requested. Looks similar to what was had with multi-slots TMs.
Thanks,
Rahul

From: zhijiang  
Sent: Wednesday, May 22, 2019 7:56 AM
To: Aljoscha Krettek ; Piotr Nowojski 
; Narayanaswamy, Krishna [Tech] 

Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
If it is still the case of multiple slots in one TaskManager, it is the same as 
before. But you said you already used the single slot per TaskManager, right?
If it is the case of single slot in TaskManager, you could attach the jstack 
when occurs next time, otherwise it is not needed.
Best,
Zhijiang
--
From:Narayanaswamy, Krishna 
Send Time:2019年5月22日(星期三) 00:49
To:zhijiang ; Aljoscha Krettek 
; Piotr Nowojski 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" ; "Erai, Rahul" 

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
Hi Zhijiang,

I couldn’t get the jstack due to some constraints this time around. Will try 
and get them when it occurs next. But from the looks of it from the 
console/logs it appears to be the same as the 2 slot cases. DataSource 
finishing up and CoGroup looking to move from DEPLOYING to RUNNING (and stuck 
at DEPLOYING)

Thanks,
Krishna.

From: zhijiang  
Sent: Tuesday, May 21, 2019 7:38 PM
To: Aljoscha Krettek ; Piotr Nowojski 
; Narayanaswamy, Krishna [Tech] 

Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
Hi  Krishna,
Could you show me or attach the jstack for the single slot case? Or is it the 
same jstack as before?
Best,
Zhijiang
--
From:Narayanaswamy, Krishna 
Send Time:2019年5月21日(星期二) 19:50
To:zhijiang ; Aljoscha Krettek 
; Piotr Nowojski 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" ; "Erai, Rahul" 

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks

We started to run jobs using the single slotted task managers which seemed to 
be ok for the past couple of days, but today morning we seem to be seeing these 
deadlocks even with 1 slot. Is there something else we could try out?

Thanks,
Krishna.

From: Narayanaswamy, Krishna [Tech] 
Sent: Friday, May 17, 2019 4:20 PM
To: 'zhijiang' ; Aljoscha Krettek 
; Piotr Nowojski 
Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
Thanks Zhijiang. 

We will try these deadlock usecases with a single slot approach to see how they 
go. Will await the fix to start using more slots on the single TM.

Thanks,
Krishna.

From: zhijiang  
Sent: Friday, May 17, 2019 4:05 PM
To: Aljoscha Krettek ; Piotr Nowojski 
; Narayanaswamy, Krishna [Tech] 

Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
I already analyzed out this deadlock case based on the codes. FLINK-10491 has 
already solved on place to cause deadlock in SpillableSubpartition, but this is 
a different place to cause this issue.
When source task is trying to release subpartition memory, meanwhile another 
CoGroup task is submitted to trigger source task to release its memory, then it 
might cause deadlock.
I would create a jira ticket for this issue and think how to solve it soon. 
Currently if you still want to use the blocking type, the simple way to avoid 
this is to make only one slot in TM, then there never happen one task triggers 
another task to release memory in the same TM. Or you could increase the 
network buffer setting to work aournd