Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-12-02 Thread Xingbo Huang
Hi Pierre,

The serialization/deserialization of sparse Row in flink is specially
optimized. The principle is that each Row will have a leading mask when
serializing to identify whether the field at the specified position is
NULL, and one field corresponds to one bit. For example, if you have 10k
fields, then there will be a mask of 10k bit / 8 = 1250 byte. In this way,
the serialization/deserialization overhead can be omitted for those field
values that are NULL.

For specific code optimization logic, you can refer to java logic[1], or
python logic[2] and cython logic[3].

[1]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java#L185
[2]
https://github.com/apache/flink/blob/master/flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py#L100
[3]
https://github.com/apache/flink/blob/master/flink-python/pyflink/fn_execution/coder_impl_fast.pyx#L697

Best,
Xingbo

Pierre Oberholzer  于2020年12月3日周四 下午3:08写道:

> Hi Xingbo, Community,
>
> Thanks a lot for your support.
> May I finally ask to conclude this thread, including wider audience:
> - Are serious performance issues to be expected with 100k fields per ROW
> (i.e. due solely to metadata overhead and independently of queries logic) ?
> - In sparse population (say 99% sparsity) already optimized in the ROW
> object or are sparse types on your roadmap ?
> Any experience with sparse Table from other users (including benchmarks
> vs. other frameworks) are also highly welcome.
>
> Thanks !
>
> Best
>
>
> Le jeu. 3 déc. 2020 à 02:53, Xingbo Huang  a écrit :
>
>> Hi Pierre,
>>
>> This example is written based on the syntax of release-1.12 that is about
>> to be released, and the test passed. In release-1.12, input_type can be
>> omitted and expression can be used directly. If you are using release-1.11,
>> you only need to modify the grammar of udf used slightly according to the
>> udf documentation[1].
>>
>> The flink table connector supports avro format, please refer to the
>> document[2].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/udfs/python_udfs.html#scalar-functions
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html#avro-format
>>
>> Best,
>> Xingbo
>>
>> Pierre Oberholzer  于2020年12月3日周四 上午2:57写道:
>>
>>> Hi Xingbo,
>>>
>>> Nice ! This looks a bit hacky, but shows that it can be done ;)
>>>
>>> I just got an exception preventing me running your code, apparently from
>>> udf.py:
>>>
>>> TypeError: Invalid input_type: input_type should be DataType but
>>> contains None
>>>
>>> Can you pls check again ?
>>> If the schema is defined is a .avsc file, do we have to parse it and
>>> rebuild those syntax (ddl and udf) and or is there an existing component
>>> that could be used ?
>>>
>>> Thanks a lot !
>>>
>>> Best,
>>>
>>>
>>> Le mer. 2 déc. 2020 à 04:50, Xingbo Huang  a écrit :
>>>
 Hi Pierre,

 I wrote a PyFlink implementation, you can see if it meets your needs:


 from pyflink.datastream import StreamExecutionEnvironment
 from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
 DataTypes
 from pyflink.table.udf import udf


 def test():
 env = StreamExecutionEnvironment.get_execution_environment()
 env.set_parallelism(1)
 t_env = StreamTableEnvironment.create(env,

 environment_settings=EnvironmentSettings.new_instance()

 .in_streaming_mode().use_blink_planner().build())

 t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
   '80m')

 # 10k nested columns
 num_field = 10_000
 fields = ['f%s INT' % i for i in range(num_field)]
 field_str = ','.join(fields)
 t_env.execute_sql(f"""
 CREATE TABLE source_table (
 f0 BIGINT,
 f1 DECIMAL(32,2),
 f2 ROW<${field_str}>,
 f3 TIMESTAMP(3)
 ) WITH (
   'connector' = 'datagen',
   'number-of-rows' = '2'
 )
 """)

 t_env.execute_sql(f"""
 CREATE TABLE print_table (
  f0 BIGINT,
  f1 DECIMAL(32,2),
  f2 ROW<${field_str}>,
  f3 TIMESTAMP(3)
 ) WITH (
  'connector' = 'print'
 )
 """)
 result_type = DataTypes.ROW(
 [DataTypes.FIELD("f%s" % i, DataTypes.INT()) for i in
 range(num_field)])

 func = udf(lambda x: x, result_type=result_type)

 source = t_env.from_path("source_table")
 result = source.select(source.f0, source.f1, func(source.f2),
 source.f3)
 result.execute_insert("print_table")


 if __name__ == '__main__':
 test()



Re: Primary keys go missing after using TableFunction + leftOuterJoinLateral

2020-12-02 Thread Rex Fenley
It appears that even when I pass id through the map function and join back
with the original table, it does not seem to think that the id passed
through map is a unique key. Is there any way to solve this while still
preserving the primary key?

On Wed, Dec 2, 2020 at 5:27 PM Rex Fenley  wrote:

> Even odder, if I pull the constructor of the function into its own
> variable it "works" (though it appears that map only passes through the
> fields mapped over which means I'll need an additional join, though now I
> think I'm on the right path).
>
> I.e.
> def splatFruits(table: Table, columnPrefix: String): Table = {
>   val func = new SplatFruitsFunc()
>   return table
> .map(func($"fruits"))
> .as(
>   s"${columnPrefix}_has_orange",
>   s"${columnPrefix}_has_banana",
>   s"${columnPrefix}_has_apple",
>   s"${columnPrefix}_has_watermelon"
>)
>.renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
> }
>
> ends up giving me the following error instead
> > org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Cannot resolve field [fruits], input field
> list:[prefix_has_orange, prefix_has_banana, prefix_has_apple,
> prefix_has_watermelon].
>
> which implies I'll need to join back to the original table like I was
> doing with the leftOuterJoinLateral originally I suppose.
>
>
> On Wed, Dec 2, 2020 at 5:15 PM Rex Fenley  wrote:
>
>> Looks like `as` needed to move outside of where it was before to fix that
>> error. Though now I'm receiving
>> >org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: Aliasing more fields than we actually have.
>>
>> Example code now:
>>
>> // table will always have pk id
>> def splatFruits(table: Table, columnPrefix: String): Table = {
>> return table
>>  .map(
>>new SplatFruitsFunc()(
>>  $"fruits"
>>)
>>  )
>>  .as(
>>   s"${columnPrefix}_has_orange",
>>   s"${columnPrefix}_has_banana",
>>   s"${columnPrefix}_has_apple",
>>   s"${columnPrefix}_has_watermelon"
>>  )
>>  .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
>> }
>>
>> class SplatFruitsFunc extends ScalarFunction {
>>   def eval(fruits: Array[String]): Row = {
>> val hasOrange: java.lang.Boolean = fruits.contains("Orange")
>> val hasBanana: java.lang.Boolean = fruits.contains("Banana")
>> val hasApple: java.lang.Boolean = fruits.contains("Apple")
>> val hasWatermelon: java.lang.Boolean = fruits.contains("Watermelon")
>> Row.of(hasOrange, hasBanana, hasApple, hasWatermelon)
>>   }
>>
>>   override def getResultType(signature: Array[Class[_]]):
>> TypeInformation[_] =
>> Types.ROW(Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN)
>> }
>>
>> which afaict correctly follows the documentation.
>>
>> Anything here stand out?
>>
>> On Wed, Dec 2, 2020 at 4:55 PM Rex Fenley  wrote:
>>
>>> So I just instead tried changing SplatFruitsFunc to a ScalaFunction and
>>> leftOuterJoinLateral to a map and I'm receiving:
>>> > org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error: Only a scalar function can be used in the map
>>> operator.
>>> which seems odd because documentation says
>>>
>>> > Performs a map operation with a user-defined scalar function or
>>> built-in scalar function. The output will be flattened if the output type
>>> is a composite type.
>>>
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations
>>>
>>> Shouldn't this work as an alternative?
>>>
>>> On Wed, Dec 2, 2020 at 3:58 PM Rex Fenley  wrote:
>>>
 Hello,

 I have a TableFunction and wherever it is applied with a
 leftOuterJoinLateral, my table loses any inference of there being a primary
 key. I see this because all subsequent joins end up with "NoUniqueKey" when
 I know a primary key of id should exist.

 I'm wondering if this is expected behavior and if it's possible to tell
 a table directly what the primary key should be?


 To demonstrate my example:
 My table function checks if an element of a certain type is in a string
 array, and depending on whether or not it is there, it appends a column
 with value true or false. For example, if array "fruits" which could
 possibly contain orange, banana, apple, and watermelon on a row contains
 only `["orange", "apple"]` then it will append `has_orange: true,
 has_banana: false, has_apple: true, has_watermelon: false` as columns to
 the row. This example is essentially the same as my code, outside of having
 a much larger set of keys and not dealing with fruits.

 Example code:

 // table will always have pk id
 def splatFruits(table: Table, columnPrefix: String): Table = {
 return table
  .leftOuterJoinLateral(
new SplatFruitsFunc()(
  $"fruits"
) as (s"${columnPrefix}_has_orange", s"${columnPrefix}_has_banana",
 s"${c

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-12-02 Thread Pierre Oberholzer
Hi Xingbo, Community,

Thanks a lot for your support.
May I finally ask to conclude this thread, including wider audience:
- Are serious performance issues to be expected with 100k fields per ROW
(i.e. due solely to metadata overhead and independently of queries logic) ?
- In sparse population (say 99% sparsity) already optimized in the ROW
object or are sparse types on your roadmap ?
Any experience with sparse Table from other users (including benchmarks vs.
other frameworks) are also highly welcome.

Thanks !

Best


Le jeu. 3 déc. 2020 à 02:53, Xingbo Huang  a écrit :

> Hi Pierre,
>
> This example is written based on the syntax of release-1.12 that is about
> to be released, and the test passed. In release-1.12, input_type can be
> omitted and expression can be used directly. If you are using release-1.11,
> you only need to modify the grammar of udf used slightly according to the
> udf documentation[1].
>
> The flink table connector supports avro format, please refer to the
> document[2].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/udfs/python_udfs.html#scalar-functions
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html#avro-format
>
> Best,
> Xingbo
>
> Pierre Oberholzer  于2020年12月3日周四 上午2:57写道:
>
>> Hi Xingbo,
>>
>> Nice ! This looks a bit hacky, but shows that it can be done ;)
>>
>> I just got an exception preventing me running your code, apparently from
>> udf.py:
>>
>> TypeError: Invalid input_type: input_type should be DataType but contains
>> None
>>
>> Can you pls check again ?
>> If the schema is defined is a .avsc file, do we have to parse it and
>> rebuild those syntax (ddl and udf) and or is there an existing component
>> that could be used ?
>>
>> Thanks a lot !
>>
>> Best,
>>
>>
>> Le mer. 2 déc. 2020 à 04:50, Xingbo Huang  a écrit :
>>
>>> Hi Pierre,
>>>
>>> I wrote a PyFlink implementation, you can see if it meets your needs:
>>>
>>>
>>> from pyflink.datastream import StreamExecutionEnvironment
>>> from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
>>> DataTypes
>>> from pyflink.table.udf import udf
>>>
>>>
>>> def test():
>>> env = StreamExecutionEnvironment.get_execution_environment()
>>> env.set_parallelism(1)
>>> t_env = StreamTableEnvironment.create(env,
>>>
>>> environment_settings=EnvironmentSettings.new_instance()
>>>
>>> .in_streaming_mode().use_blink_planner().build())
>>>
>>> t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
>>>   '80m')
>>>
>>> # 10k nested columns
>>> num_field = 10_000
>>> fields = ['f%s INT' % i for i in range(num_field)]
>>> field_str = ','.join(fields)
>>> t_env.execute_sql(f"""
>>> CREATE TABLE source_table (
>>> f0 BIGINT,
>>> f1 DECIMAL(32,2),
>>> f2 ROW<${field_str}>,
>>> f3 TIMESTAMP(3)
>>> ) WITH (
>>>   'connector' = 'datagen',
>>>   'number-of-rows' = '2'
>>> )
>>> """)
>>>
>>> t_env.execute_sql(f"""
>>> CREATE TABLE print_table (
>>>  f0 BIGINT,
>>>  f1 DECIMAL(32,2),
>>>  f2 ROW<${field_str}>,
>>>  f3 TIMESTAMP(3)
>>> ) WITH (
>>>  'connector' = 'print'
>>> )
>>> """)
>>> result_type = DataTypes.ROW(
>>> [DataTypes.FIELD("f%s" % i, DataTypes.INT()) for i in
>>> range(num_field)])
>>>
>>> func = udf(lambda x: x, result_type=result_type)
>>>
>>> source = t_env.from_path("source_table")
>>> result = source.select(source.f0, source.f1, func(source.f2),
>>> source.f3)
>>> result.execute_insert("print_table")
>>>
>>>
>>> if __name__ == '__main__':
>>> test()
>>>
>>>
>>>  Best,
>>>  Xingbo
>>>
>>> Pierre Oberholzer  于2020年12月1日周二 下午6:10写道:
>>>
 Hi Xingbo,

 That would mean giving up on using Flink (table) features on the
 content of the parsed JSON objects, so definitely a big loss. Let me know
 if I missed something.

 Thanks !

 Le mar. 1 déc. 2020 à 07:26, Xingbo Huang  a
 écrit :

> Hi Pierre,
>
> Have you ever thought of declaring your entire json as a string field
> in `Table` and putting the parsing work in UDF?
>
> Best,
> Xingbo
>
> Pierre Oberholzer  于2020年12月1日周二
> 上午4:13写道:
>
>> Hi Xingbo,
>>
>> Many thanks for your follow up. Yes you got it right.
>> So using Table API and a ROW object for the nested output of my UDF,
>> and since types are mandatory, I guess this boils down to:
>> - How to nicely specify the types for the 100k fields : shall I use
>> TypeInformation [1] or better retrieve it from Schema Registry [2] ?
>> - Do I have to put NULL values for all the fields that don't have a
>> value in my JSON ?
>> - Will the resulting Table be "sparse" a

Re: Running Flink job as a rest

2020-12-02 Thread Yun Tang
Hi Dhurandar,

I'm afraid that Flink's rest API cannot satisfy your request as it would not 
act as any source. One possible example could be SocketWindowWordCount [1] 
which listens data on a port from all taskmanagers with sources.

[1] 
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java

Best
Yun Tang

From: dhurandar S 
Sent: Thursday, December 3, 2020 5:31
To: Flink Dev ; user 
Subject: Running Flink job as a rest

Can Flink job be running as Rest Server, Where Apache Flink job is listening on 
a port (443). When a user calls this URL with payload, data directly goes to 
the Apache Flink windowing function.

Right now Flink can ingest data from Kafka or Kinesis, but we have a use case 
where we would like to push data to Flink, where Flink is listening on a port

--
Thank you and regards,
Dhurandar



Re: Questions regarding DDL and savepoints

2020-12-02 Thread Yun Tang
Hi Kevin,

If you pass the savepoint path to resume application [1], the application would 
resume from last savepoint.
If you change the logic of your DDL and since no uid can be set from users, I 
am afraid not all state could be restored as you expected.


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html#restore-a-savepoint

Best
Yun Tang

From: Kevin Kwon 
Sent: Thursday, December 3, 2020 8:31
To: user@flink.apache.org 
Subject: Questions regarding DDL and savepoints

I have a question regarding DDLs if they are considered operators and can be 
savepointed

For example

CREATE TABLE mytable (
  id BIGINT,
  data STRING
  WATERMARK(...)
) with (
  connector = 'kafka'
)

If I create the table like above, save&exit and resume application, will the 
application start from the save point (including Kafka offset)?

There's also an ongoing issue that was created by me if the operator names can 
be specified when creating tables with DDLs
https://issues.apache.org/jira/browse/FLINK-20368


Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-12-02 Thread Xingbo Huang
Hi Pierre,

This example is written based on the syntax of release-1.12 that is about
to be released, and the test passed. In release-1.12, input_type can be
omitted and expression can be used directly. If you are using release-1.11,
you only need to modify the grammar of udf used slightly according to the
udf documentation[1].

The flink table connector supports avro format, please refer to the
document[2].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/udfs/python_udfs.html#scalar-functions
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html#avro-format

Best,
Xingbo

Pierre Oberholzer  于2020年12月3日周四 上午2:57写道:

> Hi Xingbo,
>
> Nice ! This looks a bit hacky, but shows that it can be done ;)
>
> I just got an exception preventing me running your code, apparently from
> udf.py:
>
> TypeError: Invalid input_type: input_type should be DataType but contains
> None
>
> Can you pls check again ?
> If the schema is defined is a .avsc file, do we have to parse it and
> rebuild those syntax (ddl and udf) and or is there an existing component
> that could be used ?
>
> Thanks a lot !
>
> Best,
>
>
> Le mer. 2 déc. 2020 à 04:50, Xingbo Huang  a écrit :
>
>> Hi Pierre,
>>
>> I wrote a PyFlink implementation, you can see if it meets your needs:
>>
>>
>> from pyflink.datastream import StreamExecutionEnvironment
>> from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
>> DataTypes
>> from pyflink.table.udf import udf
>>
>>
>> def test():
>> env = StreamExecutionEnvironment.get_execution_environment()
>> env.set_parallelism(1)
>> t_env = StreamTableEnvironment.create(env,
>>
>> environment_settings=EnvironmentSettings.new_instance()
>>
>> .in_streaming_mode().use_blink_planner().build())
>>
>> t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
>>   '80m')
>>
>> # 10k nested columns
>> num_field = 10_000
>> fields = ['f%s INT' % i for i in range(num_field)]
>> field_str = ','.join(fields)
>> t_env.execute_sql(f"""
>> CREATE TABLE source_table (
>> f0 BIGINT,
>> f1 DECIMAL(32,2),
>> f2 ROW<${field_str}>,
>> f3 TIMESTAMP(3)
>> ) WITH (
>>   'connector' = 'datagen',
>>   'number-of-rows' = '2'
>> )
>> """)
>>
>> t_env.execute_sql(f"""
>> CREATE TABLE print_table (
>>  f0 BIGINT,
>>  f1 DECIMAL(32,2),
>>  f2 ROW<${field_str}>,
>>  f3 TIMESTAMP(3)
>> ) WITH (
>>  'connector' = 'print'
>> )
>> """)
>> result_type = DataTypes.ROW(
>> [DataTypes.FIELD("f%s" % i, DataTypes.INT()) for i in
>> range(num_field)])
>>
>> func = udf(lambda x: x, result_type=result_type)
>>
>> source = t_env.from_path("source_table")
>> result = source.select(source.f0, source.f1, func(source.f2),
>> source.f3)
>> result.execute_insert("print_table")
>>
>>
>> if __name__ == '__main__':
>> test()
>>
>>
>>  Best,
>>  Xingbo
>>
>> Pierre Oberholzer  于2020年12月1日周二 下午6:10写道:
>>
>>> Hi Xingbo,
>>>
>>> That would mean giving up on using Flink (table) features on the content
>>> of the parsed JSON objects, so definitely a big loss. Let me know if I
>>> missed something.
>>>
>>> Thanks !
>>>
>>> Le mar. 1 déc. 2020 à 07:26, Xingbo Huang  a écrit :
>>>
 Hi Pierre,

 Have you ever thought of declaring your entire json as a string field
 in `Table` and putting the parsing work in UDF?

 Best,
 Xingbo

 Pierre Oberholzer  于2020年12月1日周二 上午4:13写道:

> Hi Xingbo,
>
> Many thanks for your follow up. Yes you got it right.
> So using Table API and a ROW object for the nested output of my UDF,
> and since types are mandatory, I guess this boils down to:
> - How to nicely specify the types for the 100k fields : shall I use
> TypeInformation [1] or better retrieve it from Schema Registry [2] ?
> - Do I have to put NULL values for all the fields that don't have a
> value in my JSON ?
> - Will the resulting Table be "sparse" and suffer performance
> limitations ?
> Let me know if Table API and ROW are the right candidates here, or if
> other better alternatives exist.
> As said I'd be glad to apply some downstream transformations using
> key,value access (and possibly some Table <-> Pandas operations). Hope 
> that
> doesn't make it a too long wish list ;)
>
> Thanks a lot !
>
> Best regards,
>
> [1]
> https://stackoverflow.com/questions/48696875/how-to-attach-schema-to-a-flink-datastream-on-the-fly
> [2]
> https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-schema-registry.html
>
> Le sam. 28 nov. 2020 à 04:04, Xingbo Huang  a
> écrit :
>
>> Hi Pierre,
>>

Re: Primary keys go missing after using TableFunction + leftOuterJoinLateral

2020-12-02 Thread Rex Fenley
Even odder, if I pull the constructor of the function into its own variable
it "works" (though it appears that map only passes through the fields
mapped over which means I'll need an additional join, though now I think
I'm on the right path).

I.e.
def splatFruits(table: Table, columnPrefix: String): Table = {
  val func = new SplatFruitsFunc()
  return table
.map(func($"fruits"))
.as(
  s"${columnPrefix}_has_orange",
  s"${columnPrefix}_has_banana",
  s"${columnPrefix}_has_apple",
  s"${columnPrefix}_has_watermelon"
   )
   .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
}

ends up giving me the following error instead
> org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error: Cannot resolve field [fruits], input field
list:[prefix_has_orange, prefix_has_banana, prefix_has_apple,
prefix_has_watermelon].

which implies I'll need to join back to the original table like I was doing
with the leftOuterJoinLateral originally I suppose.


On Wed, Dec 2, 2020 at 5:15 PM Rex Fenley  wrote:

> Looks like `as` needed to move outside of where it was before to fix that
> error. Though now I'm receiving
> >org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Aliasing more fields than we actually have.
>
> Example code now:
>
> // table will always have pk id
> def splatFruits(table: Table, columnPrefix: String): Table = {
> return table
>  .map(
>new SplatFruitsFunc()(
>  $"fruits"
>)
>  )
>  .as(
>   s"${columnPrefix}_has_orange",
>   s"${columnPrefix}_has_banana",
>   s"${columnPrefix}_has_apple",
>   s"${columnPrefix}_has_watermelon"
>  )
>  .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
> }
>
> class SplatFruitsFunc extends ScalarFunction {
>   def eval(fruits: Array[String]): Row = {
> val hasOrange: java.lang.Boolean = fruits.contains("Orange")
> val hasBanana: java.lang.Boolean = fruits.contains("Banana")
> val hasApple: java.lang.Boolean = fruits.contains("Apple")
> val hasWatermelon: java.lang.Boolean = fruits.contains("Watermelon")
> Row.of(hasOrange, hasBanana, hasApple, hasWatermelon)
>   }
>
>   override def getResultType(signature: Array[Class[_]]):
> TypeInformation[_] =
> Types.ROW(Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN)
> }
>
> which afaict correctly follows the documentation.
>
> Anything here stand out?
>
> On Wed, Dec 2, 2020 at 4:55 PM Rex Fenley  wrote:
>
>> So I just instead tried changing SplatFruitsFunc to a ScalaFunction and
>> leftOuterJoinLateral to a map and I'm receiving:
>> > org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: Only a scalar function can be used in the map
>> operator.
>> which seems odd because documentation says
>>
>> > Performs a map operation with a user-defined scalar function or
>> built-in scalar function. The output will be flattened if the output type
>> is a composite type.
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations
>>
>> Shouldn't this work as an alternative?
>>
>> On Wed, Dec 2, 2020 at 3:58 PM Rex Fenley  wrote:
>>
>>> Hello,
>>>
>>> I have a TableFunction and wherever it is applied with a
>>> leftOuterJoinLateral, my table loses any inference of there being a primary
>>> key. I see this because all subsequent joins end up with "NoUniqueKey" when
>>> I know a primary key of id should exist.
>>>
>>> I'm wondering if this is expected behavior and if it's possible to tell
>>> a table directly what the primary key should be?
>>>
>>>
>>> To demonstrate my example:
>>> My table function checks if an element of a certain type is in a string
>>> array, and depending on whether or not it is there, it appends a column
>>> with value true or false. For example, if array "fruits" which could
>>> possibly contain orange, banana, apple, and watermelon on a row contains
>>> only `["orange", "apple"]` then it will append `has_orange: true,
>>> has_banana: false, has_apple: true, has_watermelon: false` as columns to
>>> the row. This example is essentially the same as my code, outside of having
>>> a much larger set of keys and not dealing with fruits.
>>>
>>> Example code:
>>>
>>> // table will always have pk id
>>> def splatFruits(table: Table, columnPrefix: String): Table = {
>>> return table
>>>  .leftOuterJoinLateral(
>>>new SplatFruitsFunc()(
>>>  $"fruits"
>>>) as (s"${columnPrefix}_has_orange", s"${columnPrefix}_has_banana",
>>> s"${columnPrefix}_has_apple", s"${columnPrefix}_has_watermelon")
>>>  )
>>>  .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
>>> }
>>>
>>> @FunctionHint(
>>>   output = new DataTypeHint(
>>> "(has_orange BOOLEAN, has_banana BOOLEAN, has_apple BOOLEAN,
>>> has_watermelon BOOLEAN)"
>>>   )
>>> )
>>> class SplatFruitsFunc
>>> extends TableFunction[(Boolean, Boolean, Boolean, Boolean)] {
>>>
>>>   def eval(fruits: Array[String]): Unit 

Re: Primary keys go missing after using TableFunction + leftOuterJoinLateral

2020-12-02 Thread Rex Fenley
Looks like `as` needed to move outside of where it was before to fix that
error. Though now I'm receiving
>org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error: Aliasing more fields than we actually have.

Example code now:

// table will always have pk id
def splatFruits(table: Table, columnPrefix: String): Table = {
return table
 .map(
   new SplatFruitsFunc()(
 $"fruits"
   )
 )
 .as(
  s"${columnPrefix}_has_orange",
  s"${columnPrefix}_has_banana",
  s"${columnPrefix}_has_apple",
  s"${columnPrefix}_has_watermelon"
 )
 .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
}

class SplatFruitsFunc extends ScalarFunction {
  def eval(fruits: Array[String]): Row = {
val hasOrange: java.lang.Boolean = fruits.contains("Orange")
val hasBanana: java.lang.Boolean = fruits.contains("Banana")
val hasApple: java.lang.Boolean = fruits.contains("Apple")
val hasWatermelon: java.lang.Boolean = fruits.contains("Watermelon")
Row.of(hasOrange, hasBanana, hasApple, hasWatermelon)
  }

  override def getResultType(signature: Array[Class[_]]):
TypeInformation[_] =
Types.ROW(Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN)
}

which afaict correctly follows the documentation.

Anything here stand out?

On Wed, Dec 2, 2020 at 4:55 PM Rex Fenley  wrote:

> So I just instead tried changing SplatFruitsFunc to a ScalaFunction and
> leftOuterJoinLateral to a map and I'm receiving:
> > org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Only a scalar function can be used in the map
> operator.
> which seems odd because documentation says
>
> > Performs a map operation with a user-defined scalar function or built-in
> scalar function. The output will be flattened if the output type is a
> composite type.
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations
>
> Shouldn't this work as an alternative?
>
> On Wed, Dec 2, 2020 at 3:58 PM Rex Fenley  wrote:
>
>> Hello,
>>
>> I have a TableFunction and wherever it is applied with a
>> leftOuterJoinLateral, my table loses any inference of there being a primary
>> key. I see this because all subsequent joins end up with "NoUniqueKey" when
>> I know a primary key of id should exist.
>>
>> I'm wondering if this is expected behavior and if it's possible to tell a
>> table directly what the primary key should be?
>>
>>
>> To demonstrate my example:
>> My table function checks if an element of a certain type is in a string
>> array, and depending on whether or not it is there, it appends a column
>> with value true or false. For example, if array "fruits" which could
>> possibly contain orange, banana, apple, and watermelon on a row contains
>> only `["orange", "apple"]` then it will append `has_orange: true,
>> has_banana: false, has_apple: true, has_watermelon: false` as columns to
>> the row. This example is essentially the same as my code, outside of having
>> a much larger set of keys and not dealing with fruits.
>>
>> Example code:
>>
>> // table will always have pk id
>> def splatFruits(table: Table, columnPrefix: String): Table = {
>> return table
>>  .leftOuterJoinLateral(
>>new SplatFruitsFunc()(
>>  $"fruits"
>>) as (s"${columnPrefix}_has_orange", s"${columnPrefix}_has_banana",
>> s"${columnPrefix}_has_apple", s"${columnPrefix}_has_watermelon")
>>  )
>>  .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
>> }
>>
>> @FunctionHint(
>>   output = new DataTypeHint(
>> "(has_orange BOOLEAN, has_banana BOOLEAN, has_apple BOOLEAN,
>> has_watermelon BOOLEAN)"
>>   )
>> )
>> class SplatFruitsFunc
>> extends TableFunction[(Boolean, Boolean, Boolean, Boolean)] {
>>
>>   def eval(fruits: Array[String]): Unit = {
>> val hasOrange: java.lang.Boolean = fruits.contains("orange")
>> val hasBanana: java.lang.Boolean = fruits.contains("banana")
>> val hasApple: java.lang.Boolean = fruits.contains("apple")
>> val hasWatermelon: java.lang.Boolean = fruits.contains("watermelon")
>> collect(hasOrange, hasBanana, hasApple, hasWatermelon)
>>   }
>> }
>>
>> Thanks!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG 
>>  |  FOLLOW US   |  LIKE US
>> 
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: Primary keys go missing after using TableFunction + leftOuterJoinLateral

2020-12-02 Thread Rex Fenley
So I just instead tried changing SplatFruitsFunc to a ScalaFunction and
leftOuterJoinLateral to a map and I'm receiving:
> org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error: Only a scalar function can be used in the map
operator.
which seems odd because documentation says

> Performs a map operation with a user-defined scalar function or built-in
scalar function. The output will be flattened if the output type is a
composite type.

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations

Shouldn't this work as an alternative?

On Wed, Dec 2, 2020 at 3:58 PM Rex Fenley  wrote:

> Hello,
>
> I have a TableFunction and wherever it is applied with a
> leftOuterJoinLateral, my table loses any inference of there being a primary
> key. I see this because all subsequent joins end up with "NoUniqueKey" when
> I know a primary key of id should exist.
>
> I'm wondering if this is expected behavior and if it's possible to tell a
> table directly what the primary key should be?
>
>
> To demonstrate my example:
> My table function checks if an element of a certain type is in a string
> array, and depending on whether or not it is there, it appends a column
> with value true or false. For example, if array "fruits" which could
> possibly contain orange, banana, apple, and watermelon on a row contains
> only `["orange", "apple"]` then it will append `has_orange: true,
> has_banana: false, has_apple: true, has_watermelon: false` as columns to
> the row. This example is essentially the same as my code, outside of having
> a much larger set of keys and not dealing with fruits.
>
> Example code:
>
> // table will always have pk id
> def splatFruits(table: Table, columnPrefix: String): Table = {
> return table
>  .leftOuterJoinLateral(
>new SplatFruitsFunc()(
>  $"fruits"
>) as (s"${columnPrefix}_has_orange", s"${columnPrefix}_has_banana",
> s"${columnPrefix}_has_apple", s"${columnPrefix}_has_watermelon")
>  )
>  .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
> }
>
> @FunctionHint(
>   output = new DataTypeHint(
> "(has_orange BOOLEAN, has_banana BOOLEAN, has_apple BOOLEAN,
> has_watermelon BOOLEAN)"
>   )
> )
> class SplatFruitsFunc
> extends TableFunction[(Boolean, Boolean, Boolean, Boolean)] {
>
>   def eval(fruits: Array[String]): Unit = {
> val hasOrange: java.lang.Boolean = fruits.contains("orange")
> val hasBanana: java.lang.Boolean = fruits.contains("banana")
> val hasApple: java.lang.Boolean = fruits.contains("apple")
> val hasWatermelon: java.lang.Boolean = fruits.contains("watermelon")
> collect(hasOrange, hasBanana, hasApple, hasWatermelon)
>   }
> }
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Questions regarding DDL and savepoints

2020-12-02 Thread Kevin Kwon
I have a question regarding DDLs if they are considered operators and can
be savepointed

For example

CREATE TABLE mytable (
  id BIGINT,
  data STRING
  WATERMARK(...)
) with (
  connector = 'kafka'
)

If I create the table like above, save&exit and resume application, will
the application start from the save point (including Kafka offset)?

There's also an ongoing issue that was created by me if the operator names
can be specified when creating tables with DDLs
https://issues.apache.org/jira/browse/FLINK-20368


Primary keys go missing after using TableFunction + leftOuterJoinLateral

2020-12-02 Thread Rex Fenley
Hello,

I have a TableFunction and wherever it is applied with a
leftOuterJoinLateral, my table loses any inference of there being a primary
key. I see this because all subsequent joins end up with "NoUniqueKey" when
I know a primary key of id should exist.

I'm wondering if this is expected behavior and if it's possible to tell a
table directly what the primary key should be?


To demonstrate my example:
My table function checks if an element of a certain type is in a string
array, and depending on whether or not it is there, it appends a column
with value true or false. For example, if array "fruits" which could
possibly contain orange, banana, apple, and watermelon on a row contains
only `["orange", "apple"]` then it will append `has_orange: true,
has_banana: false, has_apple: true, has_watermelon: false` as columns to
the row. This example is essentially the same as my code, outside of having
a much larger set of keys and not dealing with fruits.

Example code:

// table will always have pk id
def splatFruits(table: Table, columnPrefix: String): Table = {
return table
 .leftOuterJoinLateral(
   new SplatFruitsFunc()(
 $"fruits"
   ) as (s"${columnPrefix}_has_orange", s"${columnPrefix}_has_banana",
s"${columnPrefix}_has_apple", s"${columnPrefix}_has_watermelon")
 )
 .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
}

@FunctionHint(
  output = new DataTypeHint(
"(has_orange BOOLEAN, has_banana BOOLEAN, has_apple BOOLEAN,
has_watermelon BOOLEAN)"
  )
)
class SplatFruitsFunc
extends TableFunction[(Boolean, Boolean, Boolean, Boolean)] {

  def eval(fruits: Array[String]): Unit = {
val hasOrange: java.lang.Boolean = fruits.contains("orange")
val hasBanana: java.lang.Boolean = fruits.contains("banana")
val hasApple: java.lang.Boolean = fruits.contains("apple")
val hasWatermelon: java.lang.Boolean = fruits.contains("watermelon")
collect(hasOrange, hasBanana, hasApple, hasWatermelon)
  }
}

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Running Flink job as a rest

2020-12-02 Thread dhurandar S
Can Flink job be running as Rest Server, Where Apache Flink job is
listening on a port (443). When a user calls this URL with payload,
data directly goes to the Apache Flink windowing function.

Right now Flink can ingest data from Kafka or Kinesis, but we have a use
case where we would like to push data to Flink, where Flink is listening on
a port

-- 
Thank you and regards,
Dhurandar


TextFile source && KeyedWindow triggers --> Unexpected execution order

2020-12-02 Thread ANON Marta
Hello!

I have a datastream like this:

env.readTextFile("events.log")
.map(event => StopFactory(event)) // I have defined a Stop class and this 
creates an instance from the file line
.assignTimestampsAndWatermarks(stopEventTimeExtractor) // extract the timestamp 
from a field from each instance
.keyBy("mediaResource.contentId")
.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.DAYS)))
.trigger(ContinuousEventTimeTrigger.of(Time.of(5, TimeUnit.MINUTES)))
.reduce( (eventA: Stop, _: Stop) => {
  eventA.addVisualization()
  println("** REDUCING **")
  eventA
}),
 (key: Tuple,
 window: TimeWindow,
 input: Iterable[Stop],
 out: Collector[Stop] ) =>
{
  val stop = input.iterator.next()
  println("- PROCESSING ")
  if (stop.changed) {
stop.changed = false
out.collect(stop)
  }
  .print()
})

If I execute this, I see that first of all are executed every reduce and at the 
end starts to execute the processingFunction part

This is a problem for me because I want to add a sinkfunction for outputting 
results each 5 minutes from the whole daily aggregation and the goal is having 
either kafka source (I didn't have any problem with kafka) or a bunch of large 
files.

If I change the countinousTimeEventTrigger with a CountTrigger, I see that it 
does the sink and process each "x" events but at the end of the file and after 
every reduce is executed.

How could I manage this daily aggregation with 5 minutes trigger of the partial 
aggregation having a file as source?

Thank you!!

Marta
Antes de imprimir este mensaje, por favor, compruebe que es necesario. PROTEGER 
EL MEDIO AMBIENTE EST? TAMBI?N EN SU MANO.

 DISCLAIMER

Este e-mail contiene informaci?n confidencial, el contenido de la misma se 
encuentra protegido por Ley. Cualquier persona distinta a su destinataria tiene 
prohibida su reproducci?n, uso, divulgaci?n o impresi?n total o parcial. Si ha 
recibido este mensaje por error, notif?quelo de inmediato al remitente borrando 
el mensaje original juntamente con sus ficheros anexos. Gracias

This e-mail contains confidencial information protected by Law. It is 
prohibited to reproduce, use, disclose and totally or partially print the 
content of this e-mail to any person other than intended recipient. If you have 
received this message by mistake, please notify immediately the sender and 
delete the original message jointly with all attached files. Thank you.


Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-12-02 Thread Pierre Oberholzer
Hi Xingbo,

Nice ! This looks a bit hacky, but shows that it can be done ;)

I just got an exception preventing me running your code, apparently from
udf.py:

TypeError: Invalid input_type: input_type should be DataType but contains
None

Can you pls check again ?
If the schema is defined is a .avsc file, do we have to parse it and
rebuild those syntax (ddl and udf) and or is there an existing component
that could be used ?

Thanks a lot !

Best,


Le mer. 2 déc. 2020 à 04:50, Xingbo Huang  a écrit :

> Hi Pierre,
>
> I wrote a PyFlink implementation, you can see if it meets your needs:
>
>
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
> DataTypes
> from pyflink.table.udf import udf
>
>
> def test():
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> t_env = StreamTableEnvironment.create(env,
>
> environment_settings=EnvironmentSettings.new_instance()
>
> .in_streaming_mode().use_blink_planner().build())
>
> t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
>   '80m')
>
> # 10k nested columns
> num_field = 10_000
> fields = ['f%s INT' % i for i in range(num_field)]
> field_str = ','.join(fields)
> t_env.execute_sql(f"""
> CREATE TABLE source_table (
> f0 BIGINT,
> f1 DECIMAL(32,2),
> f2 ROW<${field_str}>,
> f3 TIMESTAMP(3)
> ) WITH (
>   'connector' = 'datagen',
>   'number-of-rows' = '2'
> )
> """)
>
> t_env.execute_sql(f"""
> CREATE TABLE print_table (
>  f0 BIGINT,
>  f1 DECIMAL(32,2),
>  f2 ROW<${field_str}>,
>  f3 TIMESTAMP(3)
> ) WITH (
>  'connector' = 'print'
> )
> """)
> result_type = DataTypes.ROW(
> [DataTypes.FIELD("f%s" % i, DataTypes.INT()) for i in
> range(num_field)])
>
> func = udf(lambda x: x, result_type=result_type)
>
> source = t_env.from_path("source_table")
> result = source.select(source.f0, source.f1, func(source.f2),
> source.f3)
> result.execute_insert("print_table")
>
>
> if __name__ == '__main__':
> test()
>
>
>  Best,
>  Xingbo
>
> Pierre Oberholzer  于2020年12月1日周二 下午6:10写道:
>
>> Hi Xingbo,
>>
>> That would mean giving up on using Flink (table) features on the content
>> of the parsed JSON objects, so definitely a big loss. Let me know if I
>> missed something.
>>
>> Thanks !
>>
>> Le mar. 1 déc. 2020 à 07:26, Xingbo Huang  a écrit :
>>
>>> Hi Pierre,
>>>
>>> Have you ever thought of declaring your entire json as a string field in
>>> `Table` and putting the parsing work in UDF?
>>>
>>> Best,
>>> Xingbo
>>>
>>> Pierre Oberholzer  于2020年12月1日周二 上午4:13写道:
>>>
 Hi Xingbo,

 Many thanks for your follow up. Yes you got it right.
 So using Table API and a ROW object for the nested output of my UDF,
 and since types are mandatory, I guess this boils down to:
 - How to nicely specify the types for the 100k fields : shall I use
 TypeInformation [1] or better retrieve it from Schema Registry [2] ?
 - Do I have to put NULL values for all the fields that don't have a
 value in my JSON ?
 - Will the resulting Table be "sparse" and suffer performance
 limitations ?
 Let me know if Table API and ROW are the right candidates here, or if
 other better alternatives exist.
 As said I'd be glad to apply some downstream transformations using
 key,value access (and possibly some Table <-> Pandas operations). Hope that
 doesn't make it a too long wish list ;)

 Thanks a lot !

 Best regards,

 [1]
 https://stackoverflow.com/questions/48696875/how-to-attach-schema-to-a-flink-datastream-on-the-fly
 [2]
 https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-schema-registry.html

 Le sam. 28 nov. 2020 à 04:04, Xingbo Huang  a
 écrit :

> Hi Pierre,
>
> Sorry for the late reply.
> Your requirement is that your `Table` has a `field` in `Json` format
> and its key has reached 100k, and then you want to use such a `field` as
> the input/output of `udf`, right? As to whether there is a limit on the
> number of nested key, I am not quite clear. Other contributors with
> experience in this area may have answers. On the part of `Python UDF`, if
> the type of key or value of your `Map` is `Any`, we do not support it now.
> You need to specify a specific type. For more information, please refer to
> the related document[1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/python_types.html
>
> Best,
> Xingbo
>
> 2020年11月28日 上午12:49,Pierre Oberholzer 
> 写道:
>
> Hello Wei, Dian, Xingbo,
>
> No

Re: Process windows not firing - > Can it be a Watermak issue?

2020-12-02 Thread Simone Cavallarin
Hi Till

Super, understood! I will also read the website with the link that you provided 
me.

Thanks  and have a nice eve.

best

s


From: Till Rohrmann 
Sent: 02 December 2020 17:44
To: Simone Cavallarin 
Cc: user 
Subject: Re: Process windows not firing - > Can it be a Watermak issue?

Hi Simone,

You need to set this option because otherwise Flink will not generate the 
Watermarks. If you don't need watermarks (e.g. when using ProcessingTime), then 
the system needs to send fewer records over the wire. That's basically how 
Flink has been developed [1].

With Flink 1.12 this option no longer needs to be set because it is activated 
by default.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html

Cheers,
Till

On Wed, Dec 2, 2020 at 6:17 PM Simone Cavallarin 
mailto:cavalla...@hotmail.com>> wrote:
Hi Till and David,

First at all thanks for the quick reply, really appreciated!

Drilling down to the issue:


  1.  No session is closing because there isn't a sufficiently long gap in the 
test data -> It was the first thing that I thought, before asking I run a test 
checking the gap on my data an using a super small interval of 10 millis. The 
problem was not this.
  2.  The test runs to completion before the periodic watermark generator has 
generated any watermarks. -> The data is flowing for hours and i can see 
everything debugging  the flow, every event was with all information, I'm 
mapping my stream and use the stream it self to compute the gap, so I play a 
while with it.
  3.  Which version of Flink are you using? - > 1.11.2
  4.  Have you enabled event time support via 
StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)?
 - > BINGO! this was the issue, as soon as I added this line on the code it 
started to work!! 🙂 (It would be nice to understand what this magic line is 
doing!)
  5.  The fact that assigner has no field to display simply means that it 
contains no fields which is normal for lambda functions-> Thanks I didn't know 
that. noted!

Why 
StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 is so important?

Thanks

s




From: David Anderson mailto:da...@alpinegizmo.com>>
Sent: 02 December 2020 16:01
To: Simone Cavallarin mailto:cavalla...@hotmail.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: Process windows not firing - > Can it be a Watermak issue?

In cases like this one, one of the following tends to be the explanation:

- No session is closing because there isn't a sufficiently long gap in the test 
data.
- The test runs to completion before the periodic watermark generator has 
generated any watermarks.

David

From: Till Rohrmann mailto:trohrm...@apache.org>>
Sent: 02 December 2020 15:46
To: Simone Cavallarin mailto:cavalla...@hotmail.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: Process windows not firing - > Can it be a Watermak issue?

Hi Simone,

Which version of Flink are you using? Have you enabled event time support via 
StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)?
 Are you sure that the topic you are consuming contains data? Maybe you can 
share the whole job with example data with us so that we can take a look at it 
as a whole.

The fact that assigner has no field to display simply means that it contains no 
fields which is normal for lambda functions.

Cheers,
Till




Re: Routing events to different kafka topics dynamically

2020-12-02 Thread Till Rohrmann
Hi Prasanna,

I believe that what Aljoscha suggestd in the linked discussion is still the
best way to go forward. Given your description of the problem this should
actually be pretty straightforward as you can deduce the topic from the
message. Hence, you just need to create the ProducerRecord with the right
target topic you extracted from the record/message.

Cheers,
Till

On Wed, Dec 2, 2020 at 5:28 PM Prasanna kumar 
wrote:

> Hi,
>
> Events need to be routed to different kafka topics dynamically based upon
> some info in the message.
>
> We have implemented using KeyedSerializationSchema similar to
> https://stackoverflow.com/questions/49508508/apache-flink-how-to-sink-events-to-different-kafka-topics-depending-on-the-even.
> But its deprecated and we cannot use it for production.
>
> I looked at the alternative KafkaSerializationSchema but there i do not
> see an option there.
>
> Then i stumbled upon this
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Usage-of-KafkaDeserializationSchema-and-KafkaSerializationSchema-td32347.html.
> which asks us to use KafkaContextAware.
>
> Is there a more intuitive/easier way to do the same ?
>
> Thanks,
> Prasanna.
>
>
>


Re: Process windows not firing - > Can it be a Watermak issue?

2020-12-02 Thread Till Rohrmann
Hi Simone,

You need to set this option because otherwise Flink will not generate the
Watermarks. If you don't need watermarks (e.g. when using ProcessingTime),
then the system needs to send fewer records over the wire. That's basically
how Flink has been developed [1].

With Flink 1.12 this option no longer needs to be set because it is
activated by default.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html

Cheers,
Till

On Wed, Dec 2, 2020 at 6:17 PM Simone Cavallarin 
wrote:

> Hi Till and David,
>
> First at all thanks for the quick reply, really appreciated!
>
> Drilling down to the issue:
>
>
>1. *No session is closing because there isn't a sufficiently long gap
>in the test data *-> It was the first thing that I thought, before
>asking I run a test checking the gap on my data an using a super small
>interval of 10 millis. The problem was not this.
>2. *The test runs to completion before the periodic watermark
>generator has generated any watermarks.* -> The data is flowing for
>hours and i can see everything debugging  the flow, every event was with
>all information, I'm mapping my stream and use the stream it self to
>compute the gap, so I play a while with it.
>3. *Which version of Flink are you using?* - > 1.11.2
>4. *Have you enabled event time support via
>
> StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)?
>-* > BINGO! this was the issue, as soon as I added this line on the
>code it started to work!! 🙂 (It would be nice to understand what this
>magic line is doing!)
>5. *The fact that assigner has no field to display simply means that
>it contains no fields which is normal for lambda functions-*> Thanks I
>didn't know that. noted!
>
>
> Why
> StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> is so important?
>
> Thanks
>
> s
>
>
>
> --
> *From:* David Anderson 
> *Sent:* 02 December 2020 16:01
> *To:* Simone Cavallarin 
> *Cc:* user 
> *Subject:* Re: Process windows not firing - > Can it be a Watermak issue?
>
> In cases like this one, one of the following tends to be the explanation:
>
> - No session is closing because there isn't a sufficiently long gap in the
> test data.
> - The test runs to completion before the periodic watermark generator has
> generated any watermarks.
>
> David
>
> *From:* Till Rohrmann 
> *Sent:* 02 December 2020 15:46
> *To:* Simone Cavallarin 
> *Cc:* user 
> *Subject:* Re: Process windows not firing - > Can it be a Watermak issue?
>
> Hi Simone,
>
> Which version of Flink are you using? Have you enabled event time support
> via
> StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)?
> Are you sure that the topic you are consuming contains data? Maybe you can
> share the whole job with example data with us so that we can take a look at
> it as a whole.
>
> The fact that assigner has no field to display simply means that it
> contains no fields which is normal for lambda functions.
>
> Cheers,
> Till
>
>
>


Re: Process windows not firing - > Can it be a Watermak issue?

2020-12-02 Thread Simone Cavallarin
Hi Till and David,

First at all thanks for the quick reply, really appreciated!

Drilling down to the issue:


  1.  No session is closing because there isn't a sufficiently long gap in the 
test data -> It was the first thing that I thought, before asking I run a test 
checking the gap on my data an using a super small interval of 10 millis. The 
problem was not this.
  2.  The test runs to completion before the periodic watermark generator has 
generated any watermarks. -> The data is flowing for hours and i can see 
everything debugging  the flow, every event was with all information, I'm 
mapping my stream and use the stream it self to compute the gap, so I play a 
while with it.
  3.  Which version of Flink are you using? - > 1.11.2
  4.  Have you enabled event time support via 
StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)?
 - > BINGO! this was the issue, as soon as I added this line on the code it 
started to work!! 🙂 (It would be nice to understand what this magic line is 
doing!)
  5.  The fact that assigner has no field to display simply means that it 
contains no fields which is normal for lambda functions-> Thanks I didn't know 
that. noted!

Why 
StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 is so important?

Thanks

s




From: David Anderson 
Sent: 02 December 2020 16:01
To: Simone Cavallarin 
Cc: user 
Subject: Re: Process windows not firing - > Can it be a Watermak issue?

In cases like this one, one of the following tends to be the explanation:

- No session is closing because there isn't a sufficiently long gap in the test 
data.
- The test runs to completion before the periodic watermark generator has 
generated any watermarks.

David

From: Till Rohrmann 
Sent: 02 December 2020 15:46
To: Simone Cavallarin 
Cc: user 
Subject: Re: Process windows not firing - > Can it be a Watermak issue?

Hi Simone,

Which version of Flink are you using? Have you enabled event time support via 
StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)?
 Are you sure that the topic you are consuming contains data? Maybe you can 
share the whole job with example data with us so that we can take a look at it 
as a whole.

The fact that assigner has no field to display simply means that it contains no 
fields which is normal for lambda functions.

Cheers,
Till




Re: Uploading job jar via web UI in flink HA mode

2020-12-02 Thread sidhant gupta
Hi Till,

Thanks for the clarification and suggestions

Regards
Sidhant Gupta

On Wed, Dec 2, 2020, 10:10 PM Till Rohrmann  wrote:

> Hi Sidhant,
>
> Have you seen this discussion [1]? If you want to use S3, then you need to
> make sure that you start your Flink processes with the appropriate
> FileSystemProvider for S3 [2]. So the problem you are seeing is most likely
> caused by the JVM not knowing a S3 file system implementation.
>
> Be aware that the filesystem used by the FileUploadHandler
> is java.nio.file.FileSystem and not
> Flink's org.apache.flink.core.fs.FileSystem for which we provide different
> FileSystem implementations.
>
> My best recommendation would be to not use the web submission for Flink
> jobs when running Flink behind a load balancer. Instead use the CLI which
> will retrieve the leader address from ZooKeeper.
>
> [1]
> https://lists.apache.org/thread.html/1dbaf68bc86ad3a1acc87e4bbc2dd5fa8c7560b3f59eb256f81bc855%40%3Cuser.flink.apache.org%3E
> [2]
> https://stackoverflow.com/questions/41113119/java-nio-file-implementation-for-aws
>
> Cheers,
> Till
>
> On Wed, Dec 2, 2020 at 11:31 AM sidhant gupta  wrote:
>
>> Hi All,
>>
>> I have 2 job managers in flink HA mode cluster setup. I have a load
>> balancer forwarding request to both (leader and stand by) the job managers
>> in default round-robin fashion. While uploading the job jar the Web UI is
>> fluctuating between the leader and standby page. Its difficult to upload
>> the job jar and also getting errors after submitting the job as below:
>>
>> Caused by: org.apache.flink.runtime.rest.handler.RestHandlerException:
>> Jar file /tmp/flink-web-4be8d72f-6a7d-4ec9-b71e-516e58
>> c629d5/flink-web-upload/ce291201-31e4-4e55
>> -a68b-eae1a770518d_flinkfhirmapperjob-1.2.jar does not exist
>>
>>
>> I tried using *web.upload.dir* property to use a folder location in S3
>> but getting below error:
>>
>> 2020-12-02 08:26:06,242 WARN
>> org.apache.flink.runtime.rest.FileUploadHandler  [] - File
>> upload failed.
>>
>> java.lang.IllegalArgumentException: UploadDirectory is not absolute.
>>
>> at
>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
>> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>>
>> at
>> org.apache.flink.runtime.rest.handler.FileUploads.(FileUploads.java:58)
>> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>>
>> at
>> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:159)
>> [flink-dist_2.11-1.11.2.jar:1.11.2]
>>
>> at
>> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:68)
>> [flink-dist_2.11-1.11.2.jar:1.11.2]
>>
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>> [flink-dist_2.11-1.11.2.jar:1.11.2]
>>
>>
>> Looks like web.upload.dir only supports local path.
>>
>>
>> Any suggestions on how to upload and submit job jar in flink HA cluster
>> setup via web UI and also it from the CLI ?
>>
>>
>> Thanks and regards
>>
>> Sidhant Gupta
>>
>>
>>
>>


Re: Partitioned tables in SQL client configuration.

2020-12-02 Thread Till Rohrmann
Hi Maciek,

I am pulling in Timo who might help you with this problem.

Cheers,
Till

On Tue, Dec 1, 2020 at 6:51 PM Maciek Próchniak  wrote:

> Hello,
>
> I try to configure SQL Client to query partitioned ORC data on local
> filesystem. I have directory structure like that:
>
> /tmp/table1/startdate=2020-11-28
>
> /tmp/table1/startdate=2020-11-27
>
> etc.
>
>
> If I run SQL Client session and create table by hand:
>
> create table tst (column1 string, startdate string) partitioned by
> (startdate) with ('connector'='filesystem', 'format'='orc',
> 'path'='/tmp/table1');
>
> everything runs fine:
>
> explain select * from tst where startdate='2020-11-27'
>
> shows that only one partition in 'readPartitions'
>
>
> However, I struggle to configure table in .yaml config.
>
> I tried like this (after some struggle, as "partition.keys" setting
> doesn't seem to be documented...) :
>
> tables:
>- name: tst2
>  type: source-table
>  connector: filesystem
>  path: "/tmp/table1"
>  format: orc
>  partition.keys:
>- name: startdate
>  schema:
>- name: column1
>  data-type: string
>- name: startdate
>  data-type: string
>
> and it more or less works - queries are executed properly. However,
> partitions are not pruned:
>
> explain select * from tst2 where startdate='2020-11-27'
>
> show all partitions in 'readPartitions'
>
>
> Any idea what can be wrong? I'm using Flink 1.11.2
>
>
> thanks,
>
> maciek
>
>
>


Re: Tracking ID in log4j MDC

2020-12-02 Thread Till Rohrmann
Hi Anil,

Flink does not maintain the MDC context between threads. Hence, I don't
think that it is possible w/o changes to Flink.

One note, if operators are chained then they are run by the same thread.

Cheers,
Till

On Wed, Dec 2, 2020 at 7:22 AM Anil K  wrote:

> Hi All,
>
> Is it possible to have a tracking id in MDC that will be shared across
> chained users defined operations like Filter, KeySelector, Flat map,
> Process function, and  Producer?
>
> Tracking id will be read from headers of Kafka Message, which if possible
> plan to set to MDC in log4j. Right now I am seeing tracking id is not
> getting propagated to the next function.
>
> I am using flink 1.9 running in k8.
>
> Thanks, Anil
>


Re: Uploading job jar via web UI in flink HA mode

2020-12-02 Thread Till Rohrmann
Hi Sidhant,

Have you seen this discussion [1]? If you want to use S3, then you need to
make sure that you start your Flink processes with the appropriate
FileSystemProvider for S3 [2]. So the problem you are seeing is most likely
caused by the JVM not knowing a S3 file system implementation.

Be aware that the filesystem used by the FileUploadHandler
is java.nio.file.FileSystem and not
Flink's org.apache.flink.core.fs.FileSystem for which we provide different
FileSystem implementations.

My best recommendation would be to not use the web submission for Flink
jobs when running Flink behind a load balancer. Instead use the CLI which
will retrieve the leader address from ZooKeeper.

[1]
https://lists.apache.org/thread.html/1dbaf68bc86ad3a1acc87e4bbc2dd5fa8c7560b3f59eb256f81bc855%40%3Cuser.flink.apache.org%3E
[2]
https://stackoverflow.com/questions/41113119/java-nio-file-implementation-for-aws

Cheers,
Till

On Wed, Dec 2, 2020 at 11:31 AM sidhant gupta  wrote:

> Hi All,
>
> I have 2 job managers in flink HA mode cluster setup. I have a load
> balancer forwarding request to both (leader and stand by) the job managers
> in default round-robin fashion. While uploading the job jar the Web UI is
> fluctuating between the leader and standby page. Its difficult to upload
> the job jar and also getting errors after submitting the job as below:
>
> Caused by: org.apache.flink.runtime.rest.handler.RestHandlerException: Jar
> file /tmp/flink-web-4be8d72f-6a7d-4ec9-b71e-516e58
> c629d5/flink-web-upload/ce291201-31e4-4e55
> -a68b-eae1a770518d_flinkfhirmapperjob-1.2.jar does not exist
>
>
> I tried using *web.upload.dir* property to use a folder location in S3
> but getting below error:
>
> 2020-12-02 08:26:06,242 WARN
> org.apache.flink.runtime.rest.FileUploadHandler  [] - File
> upload failed.
>
> java.lang.IllegalArgumentException: UploadDirectory is not absolute.
>
> at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.rest.handler.FileUploads.(FileUploads.java:58)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:159)
> [flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:68)
> [flink-dist_2.11-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> [flink-dist_2.11-1.11.2.jar:1.11.2]
>
>
> Looks like web.upload.dir only supports local path.
>
>
> Any suggestions on how to upload and submit job jar in flink HA cluster
> setup via web UI and also it from the CLI ?
>
>
> Thanks and regards
>
> Sidhant Gupta
>
>
>
>


Routing events to different kafka topics dynamically

2020-12-02 Thread Prasanna kumar
Hi,

Events need to be routed to different kafka topics dynamically based upon
some info in the message.

We have implemented using KeyedSerializationSchema similar to
https://stackoverflow.com/questions/49508508/apache-flink-how-to-sink-events-to-different-kafka-topics-depending-on-the-even.
But its deprecated and we cannot use it for production.

I looked at the alternative KafkaSerializationSchema but there i do not see
an option there.

Then i stumbled upon this
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Usage-of-KafkaDeserializationSchema-and-KafkaSerializationSchema-td32347.html.
which asks us to use KafkaContextAware.

Is there a more intuitive/easier way to do the same ?

Thanks,
Prasanna.


Re: test (harness and minicluster)

2020-12-02 Thread Till Rohrmann
Hi Martin,

In general, Flink's MiniCluster should be able to run every complete Flink
JobGraph. However, from what I read you are looking for a test harness for
a processWindowFunction so that you can test this function in a more unit
test style, right? What you can do is to use the
OneInputStreamOperatorTestHarness and initialize it with a WindowOperator
where you pass the ProcessWindowFunction to. That way you should be able to
test the ProcessWindowFunction. Please also take a look
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest
[1] for some examples how it could be done.

[1]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java

Cheers,
Till

On Wed, Dec 2, 2020 at 12:04 PM Martin Frank Hansen 
wrote:

> Hi,
>
> I am trying to make a test-suite for our flink cluster using harness and
> minicluster. As we are using processWindowFunctions in our pipeline we need
> some good ways of validating these functions. To my surprise
> processWindowFunctions are neither supported by test-harness or minicluster
> setups, so does anyone know if it will be supported in the future? (Or if
> it is supported how should I use it?)
>
> Furthermore does anyone have some good ideas for a test-setup for
> processWindowFunctions?
>
> best regards
>
>
> Martin Frank Hansen
>
> Data Engineer
>
>


Re: Application Mode support on VVP v2.3

2020-12-02 Thread Till Rohrmann
Hi Narasimha,

thanks for reaching out to the community. I am not entirely sure whether
VVP 2.3 supports the application mode. Since this is a rather new feature,
it could be that it has not been integrated yet. I am pulling in Ufuk and
Fabian who should be able to definitely answer your question. Maybe they
can also tell whether this is planned to be changed in the foreseeable
future.

Cheers,
Till

On Wed, Dec 2, 2020 at 4:33 PM narasimha  wrote:

> Hi,
>
> Using ververica platform to deploy flink jobs, found that it is not
> supporting application deployment mode.
> Just want to check if it is expected.
>
> Below is a brief  of how the main method has been composed.
>
>
> class Job1 {
> public void execute(){
> StreamExecutingEnvironemnt env = ...
>
> env.execute("Job 1");
> }
> }
>
> class Job2 {
> public void execute(){
> StreamExecutingEnvironemnt env = ...
>
> env.execute("Job 2");
> }
> }
>
>
> class Main {
>
> public static void main(string[] args) {
> new Job1().execute();
> new Job2().execute();
> }
> }
>
> Does this make any difference?
>
> TIA
> --
> A.Narasimha Swamy
>


Application Mode support on VVP v2.3

2020-12-02 Thread narasimha
Hi,

Using ververica platform to deploy flink jobs, found that it is not
supporting application deployment mode.
Just want to check if it is expected.

Below is a brief  of how the main method has been composed.


class Job1 {
public void execute(){
StreamExecutingEnvironemnt env = ...

env.execute("Job 1");
}
}

class Job2 {
public void execute(){
StreamExecutingEnvironemnt env = ...

env.execute("Job 2");
}
}


class Main {

public static void main(string[] args) {
new Job1().execute();
new Job2().execute();
}
}

Does this make any difference?

TIA
-- 
A.Narasimha Swamy


test (harness and minicluster)

2020-12-02 Thread Martin Frank Hansen
Hi,

I am trying to make a test-suite for our flink cluster using harness and
minicluster. As we are using processWindowFunctions in our pipeline we need
some good ways of validating these functions. To my surprise
processWindowFunctions are neither supported by test-harness or minicluster
setups, so does anyone know if it will be supported in the future? (Or if
it is supported how should I use it?)

Furthermore does anyone have some good ideas for a test-setup for
processWindowFunctions?

best regards


Martin Frank Hansen

Data Engineer


Uploading job jar via web UI in flink HA mode

2020-12-02 Thread sidhant gupta
Hi All,

I have 2 job managers in flink HA mode cluster setup. I have a load
balancer forwarding request to both (leader and stand by) the job managers
in default round-robin fashion. While uploading the job jar the Web UI is
fluctuating between the leader and standby page. Its difficult to upload
the job jar and also getting errors after submitting the job as below:

Caused by: org.apache.flink.runtime.rest.handler.RestHandlerException: Jar
file /tmp/flink-web-4be8d72f-6a7d-4ec9-b71e-516e58
c629d5/flink-web-upload/ce291201-31e4-4e55
-a68b-eae1a770518d_flinkfhirmapperjob-1.2.jar does not exist


I tried using *web.upload.dir* property to use a folder location in S3 but
getting below error:

2020-12-02 08:26:06,242 WARN
org.apache.flink.runtime.rest.FileUploadHandler  [] - File
upload failed.

java.lang.IllegalArgumentException: UploadDirectory is not absolute.

at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
~[flink-dist_2.11-1.11.2.jar:1.11.2]

at
org.apache.flink.runtime.rest.handler.FileUploads.(FileUploads.java:58)
~[flink-dist_2.11-1.11.2.jar:1.11.2]

at
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:159)
[flink-dist_2.11-1.11.2.jar:1.11.2]

at
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:68)
[flink-dist_2.11-1.11.2.jar:1.11.2]

at
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
[flink-dist_2.11-1.11.2.jar:1.11.2]


Looks like web.upload.dir only supports local path.


Any suggestions on how to upload and submit job jar in flink HA cluster
setup via web UI and also it from the CLI ?


Thanks and regards

Sidhant Gupta


Re: Performance consequence of leftOuterJoinLateral

2020-12-02 Thread Rex Fenley
Yes, exactly. Thanks!

On Tue, Dec 1, 2020 at 6:11 PM Danny Chan  wrote:

> Hi, Rex ~
>
> For "leftOuterJoinLateral" do you mean join a table function through
> lateral table ?
> If it is, yes, the complexity is O(1) for each probe key of LHS. The table
> function evaluate the extra columns and append it to the left columns.
>
> Rex Fenley  于2020年12月2日周三 上午7:54写道:
>
>> Hello,
>>
>> I'm curious if there's any performance consequence of using a
>> TableFunction + leftOuterJoinLateral to create some new columns vs creating
>> each column individually?
>>
>> I'm hoping that lookup for a row with leftOuterJoinLateral is essentially
>> O(1), so as soon as the TableFunction is done it just appends the new
>> column values to the row.
>>
>> Thanks!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG 
>>  |  FOLLOW US   |  LIKE US
>> 
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US