Re: Trying to create a generic aggregate UDF

2021-01-20 Thread Timo Walther

Hi Dylan,

I'm assuming your are using Flink 1.12 and the Blink planner?

Beginning from 1.12 you can use the "new" aggregate functions with a 
better type inference. So TypeInformation will not be used in this stack.


I tried to come up with an example that should explain the rough design. 
I will include this example into the Flink code base. I hope this helps:




import org.apache.flink.table.types.inference.InputTypeStrategies;

public static class LastIfNotNull
extends AggregateFunction> {

public static class Accumulator {
public T value;
public LocalDate date;
}

public void accumulate(Accumulator acc, T input, LocalDate date) {
if (input != null) {
acc.value = input;
acc.date = date;
}
}

@Override
public Row getValue(Accumulator acc) {
return Row.of(acc.value, acc.date);
}

@Override
public Accumulator createAccumulator() {
return new Accumulator<>();
}

@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
return TypeInference.newBuilder()
.inputTypeStrategy(
InputTypeStrategies.sequence(
InputTypeStrategies.ANY,

InputTypeStrategies.explicit(DataTypes.DATE(
.accumulatorTypeStrategy(
callContext -> {
DataType accDataType =
DataTypes.STRUCTURED(
Accumulator.class,
DataTypes.FIELD(
"value",

callContext.getArgumentDataTypes().get(0)),
DataTypes.FIELD("date", 
DataTypes.DATE()));

return Optional.of(accDataType);
})
.outputTypeStrategy(
callContext -> {
DataType argDataType = 
callContext.getArgumentDataTypes().get(0);

DataType outputDataType =
DataTypes.ROW(
DataTypes.FIELD("value", 
argDataType),
DataTypes.FIELD("date", 
DataTypes.DATE()));

return Optional.of(outputDataType);
})
.build();
}
}

Regards,
Timo



On 20.01.21 01:04, Dylan Forciea wrote:
I am attempting to create an aggregate UDF that takes a generic 
parameter T, but for the life of me, I can’t seem to get it to work.


The UDF I’m trying to implement takes two input arguments, a value that 
is generic, and a date. It will choose the non-null value with the 
latest associated date. I had originally done this with separate Top 1 
queries connected with a left join, but the memory usage seems far 
higher than doing this with a custom aggregate function.


As a first attempt, I tried to use custom type inference to have it 
validate that the first argument type is the output type and have a 
single function, and also used DataTypes.STRUCTURE to try to define the 
shape of my accumulator. However, that resulted in an exception like 
this whenever I tried to use a non-string value as the first argument:


[error] Caused by: java.lang.ClassCastException: java.lang.Long cannot 
be cast to java.lang.String


[error]   at 
io$oseberg$flink$udf$LatestNonNullAccumulator$Converter.toInternal(Unknown 
Source)


[error]   at 
org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:92)


[error]   at 
org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:47)


[error]   at 
org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:59)


[error]   at GroupAggsHandler$777.getAccumulators(Unknown Source)

[error]   at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:175)


[error]   at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:45)


[error]   at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)


[error]   at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)


[error]   at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)


[error]   at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)


[error]   at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)


[error]   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInp

Problem with overridden hashCode/equals in keys in Flink 1.11.3 when checkpointing with RocksDB

2021-01-20 Thread David Haglund
I have encountered a problem in Flink when trying to upgrade from Flink 1.9.1 to
Flink 1.11.3.

The problem in a combination of 2 components:

* Keys implemented as case classes in Scala where we override the equals and
  hashCode methods. The case class has additional fields which we are not used 
in
  the keyBy (hashCode/equals) but can have different values for a specific key 
(the
 fields we care about).
* Checkpointing with RocksDB

In Flink 1.9.1 everything worked fine, but in Flink 1.11.3 we got aggregations
for each unique key including the parameters which we did not want to include in
the keyBy, which we exclicitly do not use in hashCode and equals. It looks likes
hashCode is ignored in the keyBy in our case when we use RocksDB for 
checkpoints.

We do not see this problem if we disable checkpointing or when using
FsStateBackend.

I have seen this with "Incremental Window Aggregation with AggregateFunction"
[1], but a colleague of mine reported he had seen the same issue with
KeyedProcessFunction too.

We are using Scala version 2.11.12 and Java 8.

This looks like a bug to me. Is it a known issue or a new one?

Best regards,
/David Haglund

[1] Incremental Window Aggregation with AggregateFunction
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/windows.html#incremental-window-aggregation-with-aggregatefunction


David Haglund
Systems Engineer
Fleet Perception for Maintenance
[cid:nd_logo_d020cb30-0d08-4da8-8390-474f0e5447c8.png]
NIRA Dynamics AB
Wallenbergs gata 4
58330 Link?ping
Sweden  Mobile: +46 705 634 848
david.hagl...@niradynamics.se
www.niradynamics.se
Together for smarter safety



Re: Error querying flink state

2021-01-20 Thread Till Rohrmann
Hi Falak,

it is hard to tell what is going wrong w/o the debug logs. Could you check
whether they contain anything specific? You can also share them with us.

Cheers,
Till

On Wed, Jan 20, 2021 at 1:04 PM Falak Kansal 
wrote:

> Hi,
>
> Thank you so much for the response. I am using the 1.12 version and after
> configurational changes I am able to query the state.
>
> Although what issue I am facing is, I am able to query the state of the
> first submitted job only. Later on if i query the state of a different job
> i see the same exception. I made sure, I am using a different state name
> for the next submitted job and I am using the correct jobId in the query.
>
>
> Thank you
> Falak
>
> On Mon, Jan 18, 2021 at 11:28 PM Till Rohrmann 
> wrote:
>
>> Hi Falak,
>>
>> Which version of Flink are you using? Providing us with the debug logs
>> could also help understanding what's going wrong.
>>
>> I guess that you have copied the flink-queryable-state-runtime jar into
>> the lib directory and set queryable-state.enable: true in the
>> configuration, right? Here is the link to the documentation for queryable
>> state [1] for more details.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html
>>
>> Cheers,
>> Till
>>
>> On Thu, Jan 14, 2021 at 1:18 PM Falak Kansal 
>> wrote:
>>
>>>
>>> Hi,
>>>
>>> I have set up a flink cluster on my local machine. I created a flink job
>>> (TrackMaximumTemperature) and made the state queryable. I am using
>>> *github/streamingwithflink/chapter7/QueryableState.scala* example from 
>>> *https://github.com/streaming-with-flink
>>> * repository. Please find the
>>> file attached.
>>>
>>> Now i have the running job id and when i go and try to access the state,
>>> it throws an exception. I see the job is running and I am using the correct
>>> jobId. Also checkpointing is enabled in the original job and i have set the
>>> properties related to checkpointing in flink-conf.yaml. Am I
>>> missing something? Any leads will be appreciated. Thank you :)
>>>
>>>
>>> *Exception stack trace:*
>>> Caused by:
>>> org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could
>>> not retrieve location of state=maxTemperature of
>>> job=9a528bf3e1b650aed7e0b1e26d038ad5. Potential reasons are: i) the state
>>> is not ready, or ii) the job does not exist.
>>> at
>>> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228)
>>> at
>>> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162)
>>> at
>>> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129)
>>> at
>>> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119)
>>> at
>>> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63)
>>> at
>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
>>> at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>
>>


Re: Publishing a table to Kafka

2021-01-20 Thread Abhishek Rai
Thanks Leonard, we are working towards 1.12 upgrade and should be able
to try upsert-kafka after that.

> Your first workaround should have been worked, but looks like an exception 
> was thrown in Type conversion phase, could you share you table schema and 
> query that can reproduce the issue.

I was able to get past this but ran into another issue which is
detailed further down.  My table schema is:
Table "T0"
- column "C0" (int64) [rowtime attribute]
- column "C1" (string)

Query:
select INTERVAL '1' HOUR as E0, * from T0

In the original code that I posted, the failure happens at:
```
var rowType = table.getSchema().toRowType();
```

I got past this by bridging duration types to long/int before
converting to TypeInformation using
`TypeConversions.fromDataTypeToLegacyInfo`:

```
  var dataType = schema.getFieldDataTypes()[i];
  var typeRoot = dataType.getLogicalType().getTypeRoot();
  if (typeRoot.equals(LogicalTypeRoot.INTERVAL_DAY_TIME)) {
dataType = dataType.bridgedTo(Long.class);
  }
  if (typeRoot.equals(LogicalTypeRoot.INTERVAL_YEAR_MONTH)) {
dataType = dataType.bridgedTo(Integer.class);
  }
```

After getting past this, the next problem I've now run into is as
follows.  Like I noted above, I'm converting from Table API to
DataStream API.  We are now seeing the following error at runtime:

```
java.lang.RuntimeException: class java.sql.Timestamp cannot be cast to
class java.lang.Long (java.sql.Timestamp is in module java.sql of
loader 'platform'; java.lang.Long is in module java.base of loader
'bootstrap')
```

This error comes from the rowtime attribute column which our custom
table source generates as `java.sql.Timestamp` objects.  That works
out OK when we were applying entirely at table API level.

Our guess is that after conversion to DataStream API, the rowtime
attribute column uses `TimeIndicatorTypeInfo` which is serialized as
long.  Hence the error converting from `Timestamp` to `Long`.

In our case, we would like to continue using the unmodified table
source (generating Timestamp type for the rowtime attribute column),
hence this approach also seems to have hit a dead end.  We are now
planning to try out the upsert-kafka sink following 1.12 upgrade.

Thanks,
Abhishek

On Fri, Jan 15, 2021 at 3:50 AM Leonard Xu  wrote:
>
> Hi, Rai
>
> What are my options to still write to Kafka?  I don't mind introducing
> another boolean/etc field in the Kafka output records containing the
> row kind or similar info.
>
>
> The recommended way is use `upset-kafka`[1] connector which you can write 
> insert/update/retract message to a
> compacted kafka topic and read insert/update/retract messages from this topic 
> as well. It’s a new feature since 1.12,
> there’s no options to control write boolean/etc fields before 1.12 version, 
> because the boolean flag(rowkind) is not exposed to users.
>
>
> The first workaround that I tried is to convert the table to
> ```
> TableException: Unsupported conversion from data type 'INTERVAL
> SECOND(3) NOT NULL' (conversion class: java.time.Duration) to type
> information. Only data types that originated from type information
> fully support a reverse conversion.
> ```
>
>
> Your first workaround should have been worked, but looks like an exception 
> was thrown in Type conversion phase, could you share you table schema and 
> query that can reproduce the issue.
>
> Best,
> Leonard
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/upsert-kafka.html


Re: Trying to create a generic aggregate UDF

2021-01-20 Thread Dylan Forciea
Timo,

I appreciate it! I am using Flink 1.12.0 right now with the Blink planner. What 
you proposed is roughly what I had come up with the first time around that 
resulted in the stack trace with the ClassCastException I had originally 
included. I saw that you had used a Row instead of just the value in our 
example, but changing it that way didn't seem to help, which makes sense since 
the problem seems to be in the code generated for the accumulator Converter and 
not the output. 

Here is the exact code that caused that error (while calling LatestNonNullLong):

The registration of the below:
env.createTemporarySystemFunction("LatestNonNullLong", 
classOf[LatestNonNull[Long]])
env.createTemporarySystemFunction("LatestNonNullString", 
classOf[LatestNonNull[String]])


The class itself:

import java.time.LocalDate
import java.util.Optional
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.catalog.DataTypeFactory
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.table.types.inference.{InputTypeStrategies, 
TypeInference}

case class LatestNonNullAccumulator[T](
var value: T = null.asInstanceOf[T],
var date: LocalDate = null)

class LatestNonNull[T] extends AggregateFunction[T, 
LatestNonNullAccumulator[T]] {

  override def createAccumulator(): LatestNonNullAccumulator[T] = {
LatestNonNullAccumulator[T]()
  }

  override def getValue(acc: LatestNonNullAccumulator[T]): T = {
acc.value
  }

  def accumulate(acc: LatestNonNullAccumulator[T], value: T, date: LocalDate): 
Unit = {
if (value != null) {
  Option(acc.date).fold {
acc.value = value
acc.date = date
  } { accDate =>
if (date != null && date.isAfter(accDate)) {
  acc.value = value
  acc.date = date
}
  }
}
  }

  def merge(
  acc: LatestNonNullAccumulator[T],
  it: java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = {
val iter = it.iterator()
while (iter.hasNext) {
  val a = iter.next()
  if (a.value != null) {
Option(acc.date).fold {
  acc.value = a.value
  acc.date = a.date
} { accDate =>
  Option(a.date).map { curDate =>
if (curDate.isAfter(accDate)) {
  acc.value = a.value
  acc.date = a.date
}
  }
}
  }
}
  }

  def resetAccumulator(acc: LatestNonNullAccumulator[T]): Unit = {
acc.value = null.asInstanceOf[T]
acc.date = null
  }

  override def getTypeInference(typeFactory: DataTypeFactory): TypeInference = {
TypeInference
  .newBuilder()
  .inputTypeStrategy(InputTypeStrategies
.sequence(InputTypeStrategies.ANY, 
InputTypeStrategies.explicit(DataTypes.DATE(
  .accumulatorTypeStrategy { callContext =>
val accDataType = DataTypes.STRUCTURED(
  classOf[LatestNonNullAccumulator[T]],
  DataTypes.FIELD("value", callContext.getArgumentDataTypes.get(0)),
  DataTypes.FIELD("date", DataTypes.DATE()))

Optional.of(accDataType)
  }
  .outputTypeStrategy { callContext =>
val outputDataType = callContext.getArgumentDataTypes().get(0);
Optional.of(outputDataType);
  }
  .build()
  }
}

Regards,
Dylan Forciea

On 1/20/21, 2:37 AM, "Timo Walther"  wrote:

Hi Dylan,

I'm assuming your are using Flink 1.12 and the Blink planner?

Beginning from 1.12 you can use the "new" aggregate functions with a 
better type inference. So TypeInformation will not be used in this stack.

I tried to come up with an example that should explain the rough design. 
I will include this example into the Flink code base. I hope this helps:



import org.apache.flink.table.types.inference.InputTypeStrategies;

public static class LastIfNotNull
 extends AggregateFunction> {

 public static class Accumulator {
 public T value;
 public LocalDate date;
 }

 public void accumulate(Accumulator acc, T input, LocalDate date) {
 if (input != null) {
 acc.value = input;
 acc.date = date;
 }
 }

 @Override
 public Row getValue(Accumulator acc) {
 return Row.of(acc.value, acc.date);
 }

 @Override
 public Accumulator createAccumulator() {
 return new Accumulator<>();
 }

 @Override
 public TypeInference getTypeInference(DataTypeFactory typeFactory) {
 return TypeInference.newBuilder()
 .inputTypeStrategy(
 InputTypeStrategies.sequence(
 InputTypeStrategies.ANY,

InputTypeStrategies.explicit(DataTypes.DATE(
 .accumulatorTypeStrategy(
 callContext -> {
 DataType accDataType =

Re: Trying to create a generic aggregate UDF

2021-01-20 Thread Dylan Forciea
As a side note, I also just tried to unify into a single function registration 
and used _ as the type parameter in the classOf calls there and within the 
TypeInference definition for the accumulator and still ended up with the exact 
same stack trace.

Dylan

On 1/20/21, 9:22 AM, "Dylan Forciea"  wrote:

Timo,

I appreciate it! I am using Flink 1.12.0 right now with the Blink planner. 
What you proposed is roughly what I had come up with the first time around that 
resulted in the stack trace with the ClassCastException I had originally 
included. I saw that you had used a Row instead of just the value in our 
example, but changing it that way didn't seem to help, which makes sense since 
the problem seems to be in the code generated for the accumulator Converter and 
not the output. 

Here is the exact code that caused that error (while calling 
LatestNonNullLong):

The registration of the below:
env.createTemporarySystemFunction("LatestNonNullLong", 
classOf[LatestNonNull[Long]])
env.createTemporarySystemFunction("LatestNonNullString", 
classOf[LatestNonNull[String]])


The class itself:

import java.time.LocalDate
import java.util.Optional
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.catalog.DataTypeFactory
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.table.types.inference.{InputTypeStrategies, 
TypeInference}

case class LatestNonNullAccumulator[T](
var value: T = null.asInstanceOf[T],
var date: LocalDate = null)

class LatestNonNull[T] extends AggregateFunction[T, 
LatestNonNullAccumulator[T]] {

  override def createAccumulator(): LatestNonNullAccumulator[T] = {
LatestNonNullAccumulator[T]()
  }

  override def getValue(acc: LatestNonNullAccumulator[T]): T = {
acc.value
  }

  def accumulate(acc: LatestNonNullAccumulator[T], value: T, date: 
LocalDate): Unit = {
if (value != null) {
  Option(acc.date).fold {
acc.value = value
acc.date = date
  } { accDate =>
if (date != null && date.isAfter(accDate)) {
  acc.value = value
  acc.date = date
}
  }
}
  }

  def merge(
  acc: LatestNonNullAccumulator[T],
  it: java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = {
val iter = it.iterator()
while (iter.hasNext) {
  val a = iter.next()
  if (a.value != null) {
Option(acc.date).fold {
  acc.value = a.value
  acc.date = a.date
} { accDate =>
  Option(a.date).map { curDate =>
if (curDate.isAfter(accDate)) {
  acc.value = a.value
  acc.date = a.date
}
  }
}
  }
}
  }

  def resetAccumulator(acc: LatestNonNullAccumulator[T]): Unit = {
acc.value = null.asInstanceOf[T]
acc.date = null
  }

  override def getTypeInference(typeFactory: DataTypeFactory): 
TypeInference = {
TypeInference
  .newBuilder()
  .inputTypeStrategy(InputTypeStrategies
.sequence(InputTypeStrategies.ANY, 
InputTypeStrategies.explicit(DataTypes.DATE(
  .accumulatorTypeStrategy { callContext =>
val accDataType = DataTypes.STRUCTURED(
  classOf[LatestNonNullAccumulator[T]],
  DataTypes.FIELD("value", callContext.getArgumentDataTypes.get(0)),
  DataTypes.FIELD("date", DataTypes.DATE()))

Optional.of(accDataType)
  }
  .outputTypeStrategy { callContext =>
val outputDataType = callContext.getArgumentDataTypes().get(0);
Optional.of(outputDataType);
  }
  .build()
  }
}

Regards,
Dylan Forciea

On 1/20/21, 2:37 AM, "Timo Walther"  wrote:

Hi Dylan,

I'm assuming your are using Flink 1.12 and the Blink planner?

Beginning from 1.12 you can use the "new" aggregate functions with a 
better type inference. So TypeInformation will not be used in this 
stack.

I tried to come up with an example that should explain the rough 
design. 
I will include this example into the Flink code base. I hope this helps:



import org.apache.flink.table.types.inference.InputTypeStrategies;

public static class LastIfNotNull
 extends AggregateFunction> {

 public static class Accumulator {
 public T value;
 public LocalDate date;
 }

 public void accumulate(Accumulator acc, T input, LocalDate 
date) {
 if (input != null) {
 acc.value = input;
 acc.date = date;
 }
 }

 @Override
 public Row getValu

Re: Counter metrics for prometheus having unexepcted gaps in grafana

2021-01-20 Thread Chesnay Schepler
Not sure whether it would solve your issue, but you could maybe exclude 
the pod id (I assume you mean the host?) from being reported by setting 
this:

metrics.reporter..scope.variables.excludes: 



On 1/20/2021 7:16 AM, Manish G wrote:

Hi All,

I am facing an issue with counter metrics I have added to a flatmap 
function.
My application is deployed in kubernetes, and hence the prometheus 
metrics generated has pod id as one of its label. Now if pod dies and 
a new pod comes up, we have a brand new metrics starting from 0.

As a result, manoy of the graphs in grafana have unexpected gaps.

Can I somehow avoid it?





Handling validations/errors in the flink job

2021-01-20 Thread sagar
Hi Team,


I am creating a flink job with DataStream API and batch mode.

It is having 5 different bounded sources and I need to perform some
business operations on it like joining , aggregating etc.



I am using a CoGroup operator to join two streams as it serves as a left
join. So when keys are present in both the stream, I am processing and
moving ahead.

But when there is only one key present I need to send it as an error.



Some operators like Process have side output features, but CoGroup doesn't
have that feature.



In order to report missing data to different stream, I am planning to
create one common error handling stream and at each CoGroup operation I am
planning to write it to error stream by using Split operator after CoGroup



Let me know if that is the correct way of handling the errors?

-- 
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not the
intended recipient please ignore this email.


Re: Trying to create a generic aggregate UDF

2021-01-20 Thread Dylan Forciea
Oh, I think I might have a clue as to what is going on. I notice that it will 
work properly when I only call it on Long. I think that it is using the same 
generated code for the Converter for whatever was called first.

Since in Scala I can't declare an object as static within the class itself, I 
wonder if it won't generate appropriate Converter code per subtype. I tried 
creating a subclass that is specific to the type within my class and returning 
that as the accumulator, but that didn't help. And, I can't refer to that class 
in the TypeInference since it isn't static and I get an error from Flink 
because of that. I'm going to see if I just write this UDF in Java with an 
embedded public static class like you have if it will solve my problems. I'll 
report back to let you know what I find. If that works, I'm not quite sure how 
to make it work in Scala.

Regards,
Dylan Forciea

On 1/20/21, 9:34 AM, "Dylan Forciea"  wrote:

As a side note, I also just tried to unify into a single function 
registration and used _ as the type parameter in the classOf calls there and 
within the TypeInference definition for the accumulator and still ended up with 
the exact same stack trace.

Dylan

On 1/20/21, 9:22 AM, "Dylan Forciea"  wrote:

Timo,

I appreciate it! I am using Flink 1.12.0 right now with the Blink 
planner. What you proposed is roughly what I had come up with the first time 
around that resulted in the stack trace with the ClassCastException I had 
originally included. I saw that you had used a Row instead of just the value in 
our example, but changing it that way didn't seem to help, which makes sense 
since the problem seems to be in the code generated for the accumulator 
Converter and not the output. 

Here is the exact code that caused that error (while calling 
LatestNonNullLong):

The registration of the below:
env.createTemporarySystemFunction("LatestNonNullLong", 
classOf[LatestNonNull[Long]])
env.createTemporarySystemFunction("LatestNonNullString", 
classOf[LatestNonNull[String]])


The class itself:

import java.time.LocalDate
import java.util.Optional
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.catalog.DataTypeFactory
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.table.types.inference.{InputTypeStrategies, 
TypeInference}

case class LatestNonNullAccumulator[T](
var value: T = null.asInstanceOf[T],
var date: LocalDate = null)

class LatestNonNull[T] extends AggregateFunction[T, 
LatestNonNullAccumulator[T]] {

  override def createAccumulator(): LatestNonNullAccumulator[T] = {
LatestNonNullAccumulator[T]()
  }

  override def getValue(acc: LatestNonNullAccumulator[T]): T = {
acc.value
  }

  def accumulate(acc: LatestNonNullAccumulator[T], value: T, date: 
LocalDate): Unit = {
if (value != null) {
  Option(acc.date).fold {
acc.value = value
acc.date = date
  } { accDate =>
if (date != null && date.isAfter(accDate)) {
  acc.value = value
  acc.date = date
}
  }
}
  }

  def merge(
  acc: LatestNonNullAccumulator[T],
  it: java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = {
val iter = it.iterator()
while (iter.hasNext) {
  val a = iter.next()
  if (a.value != null) {
Option(acc.date).fold {
  acc.value = a.value
  acc.date = a.date
} { accDate =>
  Option(a.date).map { curDate =>
if (curDate.isAfter(accDate)) {
  acc.value = a.value
  acc.date = a.date
}
  }
}
  }
}
  }

  def resetAccumulator(acc: LatestNonNullAccumulator[T]): Unit = {
acc.value = null.asInstanceOf[T]
acc.date = null
  }

  override def getTypeInference(typeFactory: DataTypeFactory): 
TypeInference = {
TypeInference
  .newBuilder()
  .inputTypeStrategy(InputTypeStrategies
.sequence(InputTypeStrategies.ANY, 
InputTypeStrategies.explicit(DataTypes.DATE(
  .accumulatorTypeStrategy { callContext =>
val accDataType = DataTypes.STRUCTURED(
  classOf[LatestNonNullAccumulator[T]],
  DataTypes.FIELD("value", 
callContext.getArgumentDataTypes.get(0)),
  DataTypes.FIELD("date", DataTypes.DATE()))

Optional.of(accDataType)
  }
  .outputTypeStrategy { callContext

Re: Trying to create a generic aggregate UDF

2021-01-20 Thread Dylan Forciea
Timo,

I converted what I had to Java, and ended up with the exact same issue as 
before where it will work if I only ever use it on 1 type, but not if I use it 
on multiple. Maybe this is a bug?

Dylan

On 1/20/21, 10:06 AM, "Dylan Forciea"  wrote:

Oh, I think I might have a clue as to what is going on. I notice that it 
will work properly when I only call it on Long. I think that it is using the 
same generated code for the Converter for whatever was called first.

Since in Scala I can't declare an object as static within the class itself, 
I wonder if it won't generate appropriate Converter code per subtype. I tried 
creating a subclass that is specific to the type within my class and returning 
that as the accumulator, but that didn't help. And, I can't refer to that class 
in the TypeInference since it isn't static and I get an error from Flink 
because of that. I'm going to see if I just write this UDF in Java with an 
embedded public static class like you have if it will solve my problems. I'll 
report back to let you know what I find. If that works, I'm not quite sure how 
to make it work in Scala.

Regards,
Dylan Forciea

On 1/20/21, 9:34 AM, "Dylan Forciea"  wrote:

As a side note, I also just tried to unify into a single function 
registration and used _ as the type parameter in the classOf calls there and 
within the TypeInference definition for the accumulator and still ended up with 
the exact same stack trace.

Dylan

On 1/20/21, 9:22 AM, "Dylan Forciea"  wrote:

Timo,

I appreciate it! I am using Flink 1.12.0 right now with the Blink 
planner. What you proposed is roughly what I had come up with the first time 
around that resulted in the stack trace with the ClassCastException I had 
originally included. I saw that you had used a Row instead of just the value in 
our example, but changing it that way didn't seem to help, which makes sense 
since the problem seems to be in the code generated for the accumulator 
Converter and not the output. 

Here is the exact code that caused that error (while calling 
LatestNonNullLong):

The registration of the below:
env.createTemporarySystemFunction("LatestNonNullLong", 
classOf[LatestNonNull[Long]])
env.createTemporarySystemFunction("LatestNonNullString", 
classOf[LatestNonNull[String]])


The class itself:

import java.time.LocalDate
import java.util.Optional
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.catalog.DataTypeFactory
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.table.types.inference.{InputTypeStrategies, 
TypeInference}

case class LatestNonNullAccumulator[T](
var value: T = null.asInstanceOf[T],
var date: LocalDate = null)

class LatestNonNull[T] extends AggregateFunction[T, 
LatestNonNullAccumulator[T]] {

  override def createAccumulator(): LatestNonNullAccumulator[T] = {
LatestNonNullAccumulator[T]()
  }

  override def getValue(acc: LatestNonNullAccumulator[T]): T = {
acc.value
  }

  def accumulate(acc: LatestNonNullAccumulator[T], value: T, date: 
LocalDate): Unit = {
if (value != null) {
  Option(acc.date).fold {
acc.value = value
acc.date = date
  } { accDate =>
if (date != null && date.isAfter(accDate)) {
  acc.value = value
  acc.date = date
}
  }
}
  }

  def merge(
  acc: LatestNonNullAccumulator[T],
  it: java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = {
val iter = it.iterator()
while (iter.hasNext) {
  val a = iter.next()
  if (a.value != null) {
Option(acc.date).fold {
  acc.value = a.value
  acc.date = a.date
} { accDate =>
  Option(a.date).map { curDate =>
if (curDate.isAfter(accDate)) {
  acc.value = a.value
  acc.date = a.date
}
  }
}
  }
}
  }

  def resetAccumulator(acc: LatestNonNullAccumulator[T]): Unit = {
acc.value = null.asInstanceOf[T]
acc.date = null
  }

  override def getTypeInference(typeFactory: DataTypeFactory): 
TypeInference = {
TypeInference
  .newBuilder()
  .inputTypeStrateg

Re: question about timers

2021-01-20 Thread Seth Wiesman
Yes,

Processing time timers that should have fired will fire immediately in
order.

Event time timers are never *late*, they will just fire when the watermark
advances.

Seth

On Tue, Jan 19, 2021 at 3:58 PM Marco Villalobos 
wrote:

> If there are timers that have been checkpointed (we use rocksdb), and the
> system goes down, and then the system goes back up after the timers should
> have fired, do those timers that were skipped still fire, even though we
> are past that time?
>
> example:
>
> for example, if the current time is 1:00 p.m.  And the timer is supposed
> to fire at 1:15 p.m.
> and the system crashes at 1:00 p.m., but is brought back up at 1:20 p.m.
>
> Does the timer still fire?
>


Re: Restoring from a checkpoint or savepoint on a different Kafka consumer group

2021-01-20 Thread Rex Fenley
Thanks!

On Tue, Jan 19, 2021 at 9:47 PM Piotr Nowojski 
wrote:

> Hi Rex,
>
> Sorry, I might have misled you. I think you were right in your previous
> email
>
> > So from the sounds of things, regardless of the consumer group's
> offsets, it will always start from a checkpoint or savepoints offsets if
> there are some (unless checkpointing offsets is turned off).
> >
> > Is this interpretation correct?
>
> I think this is correct. `setStartFromGroupOffsets` and other `setStart*`
> variants take effect only if there are no offsets stored in the state. I
> would suggest you try it out regardless.
>
> If you want to duplicate a job for some testing, each of the duplicated
> jobs will have it's own sets of offsets and they will read records
> independently, but starting from the same starting point (when the job was
> duplicated).
>
> Piotrek
>
> wt., 19 sty 2021 o 20:19 Rex Fenley  napisał(a):
>
>> Thank you,
>>
>> That's unfortunate, because I imagine we often will want to duplicate a
>> job in order to do some testing out-of-bound from the normal job while
>> slightly tweaking / tuning things. Is there any way to transfer offsets
>> between consumer groups?
>>
>> On Tue, Jan 19, 2021 at 5:45 AM Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> > I read this as, "The offsets committed to Kafka are ignored, the
>>> offsets committed within a checkpoint are used".
>>>
>>> yes, exactly
>>>
>>> > So from the sounds of things, regardless of the consumer group's
>>> offsets, it will always start from a checkpoint or savepoints offsets if
>>> there are some (unless checkpointing offsets is turned off).
>>>
>>> Yes. But, keep in mind this part:
>>>
>>> > setStartFromGroupOffsets (default behaviour): Start reading partitions
>>> from the consumer group’s (group.id setting in the consumer properties)
>>> committed offsets in Kafka brokers.* If offsets could not be found for
>>> a partition, the auto.offset.reset setting in the properties will be used.*
>>>
>>> As I understand it, if you are using the default
>>> `setStartFromGroupOffsets`, and you happen to change `group.id` (which
>>> is what I believe you were asking about in the first e-mail), after
>>> changing the `group.id` FlinkKafkaConsumer will not be able to found
>>> previously saved offsets in the Flink's state and it will start reading
>>> from completely new set of offsets. The same way as if this would be a
>>> freshly started new job without any state. Those new offsets would be as
>>> specified/defined via `auto.offset.reset`.
>>>
>>> Piotrek
>>>
>>>
>>> pon., 18 sty 2021 o 18:12 Rex Fenley  napisał(a):
>>>
 Thank you,

 Some parts that stick out
 >The Flink Kafka Consumer allows configuring the behaviour of how
 offsets are committed back to Kafka brokers. Note that the Flink Kafka
 Consumer does not rely on the committed offsets for fault tolerance
 guarantees. The committed offsets are only a means to expose the consumer’s
 progress for monitoring purposes.

 I read this as, "The offsets committed to Kafka are ignored, the
 offsets committed within a checkpoint are used".

 >With Flink’s checkpointing enabled, the Flink Kafka Consumer will
 consume records from a topic and periodically checkpoint all its Kafka
 offsets, together with the state of other operations. In case of a job
 failure, Flink will restore the streaming program to the state of the
 latest checkpoint and re-consume the records from Kafka, starting from the
 offsets that were stored in the checkpoint.

 This seems to say something similar.

 So from the sounds of things, regardless of the consumer group's
 offsets, it will always start from a checkpoint or savepoints offsets if
 there are some (unless checkpointing offsets is turned off).

 Is this interpretation correct?

 Thanks!


 On Mon, Jan 18, 2021 at 3:23 AM Piotr Nowojski 
 wrote:

> Hi Rex,
>
> I believe this section answers your question [1]
>
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>
> pon., 18 sty 2021 o 09:00 赵一旦  napisał(a):
>
>> If you changed the consumer group in your new job, the group id will
>> be the new one you set.
>> The job will continue to consumer the topics from the
>> savepoint/checkpoint you specified no matter whether the group id is the
>> original one?
>>
>> Rex Fenley  于2021年1月18日周一 下午12:53写道:
>>
>>> Hello,
>>>
>>> When using the Kafka consumer connector, if we restore a from a
>>> checkpoint or savepoint using a differently named consumer group than 
>>> the
>>> one we originally ran a job with will it still pick up exactly where it
>>> left off or are you locked into using the same consumer group as before?
>>>
>>> Thanks!
>>>
>>> --
>>>

Re: Problem with overridden hashCode/equals in keys in Flink 1.11.3 when checkpointing with RocksDB

2021-01-20 Thread David Haglund
I have an update. I have created a small project on github, 
https://github.com/daha/flink-key-by-problem-with-rocksdb-state,  which 
reproduces the issue.

There seems to be problem with RocksDB in all versions I have tested (from 
1.7.1 and later). In Flink 1.9.x only one of the events is counted with RockDB. 
In Flink 1.10.x and later all events are counted but with separate keys when 
all/both events should be counted using the same key.

The main branch in my sample project is using Flink 1.11.3, then there are 
branches for Flink 1.9.1, 1.10.3 and 1.12.1.

Best regards,
/David Haglund

From: David Haglund 
Date: Wednesday, 20 January 2021 at 09:38
I have encountered a problem in Flink when trying to upgrade from Flink 1.9.1 to
Flink 1.11.3.

The problem in a combination of 2 components:

* Keys implemented as case classes in Scala where we override the equals and
  hashCode methods. The case class has additional fields which we are not used 
in
  the keyBy (hashCode/equals) but can have different values for a specific key 
(the
 fields we care about).
* Checkpointing with RocksDB

In Flink 1.9.1 everything worked fine, but in Flink 1.11.3 we got aggregations
for each unique key including the parameters which we did not want to include in
the keyBy, which we exclicitly do not use in hashCode and equals. It looks likes
hashCode is ignored in the keyBy in our case when we use RocksDB for 
checkpoints.

We do not see this problem if we disable checkpointing or when using
FsStateBackend.

I have seen this with "Incremental Window Aggregation with AggregateFunction"
[1], but a colleague of mine reported he had seen the same issue with
KeyedProcessFunction too.

We are using Scala version 2.11.12 and Java 8.

This looks like a bug to me. Is it a known issue or a new one?

Best regards,
/David Haglund

[1] Incremental Window Aggregation with AggregateFunction
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/windows.html#incremental-window-aggregation-with-aggregatefunction

David Haglund
Systems Engineer
Fleet Perception for Maintenance
[cid:image001.png@01D6EF66.68FF41F0]
NIRA Dynamics AB
Wallenbergs gata 4
58330 Linköping
Sweden
Mobile: +46 705 634 848
david.hagl...@niradynamics.se
www.niradynamics.se
Together for smarter safety



Converting non-mini-batch to mini-batch from checkpoint or savepoint

2021-01-20 Thread Rex Fenley
Hello,

Is it safe to convert a non-mini-batch job to a mini-batch job when
restoring from a checkpoint or a savepoint?

Thanks

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



org.apache.flink.runtime.client.JobSubmissionException: Job has already been submitted

2021-01-20 Thread Hailu, Andreas [Engineering]
Hello,

We're running 1.9.2 on YARN, and are seeing some interesting behavior when 
submitting jobs in a multi-threaded fashion to an application's Flink cluster. 
The error we see reported in the client application logs is the following:

org.apache.flink.client.program.ProgramInvocationException: Could not retrieve 
the execution result. (JobID: 8e1c2fdd68feee100d8fee003efef3e2)
   at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255)
   at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
   at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
   at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
...
   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
   at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:391)
   at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
   at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
   at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
   at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
   at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:263)
   at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
   at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
   at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
   at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
   at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
   at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
   ... 3 more
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal 
server error., mailto:0...@akka.tcp://fl...@d43723-191.dc.gs.com:37966/user/jobmanager_124%20for%20job%208e1c2fdd68feee100d8fee003efef3e2>.
2021-01-20 14:07:29,822 INFO  [flink-akka.actor.default-dispatcher-64] 
org.apache.flink.yarn.YarnResourceManager - Request slot 
with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, 
directMemoryInMB=-1, nativeMemoryInMB=-1, networkMemoryInMB=-1, 
managedMemoryInMB=-1} for job 8e1c2fdd68feee100d8fee003efef3e2 with allocation 
id 5bca3bde577f93169928e04749b45343.
2021-01-20 14:08:45,199 INFO  [flink-akka.actor.default-dispatcher-30] 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Received 
JobGraph submission 8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed Jan 
20 14:01:42 EST 2021).
2021-01-20 14:09:19,981 INFO  [flink-akka.actor.default-dispatcher-90] 
org.apache.flink.runtime.jobmaster.JobMaster  - Stopping the 
JobMaster for job Flink Java Job at Wed Jan 20 14:01:42 EST 
2021(8e1c2fdd68feee100d8fee003efef3e2).
2021-01-20 14:09:19,982 INFO  [flink-akka.actor.default-dispatcher-90] 
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Flink Java 
Job at Wed Jan 20 14:01:42 EST 2021 (8e1c2fdd68feee100d8fee003efef3e2) switched 
from state RUNNING to FAILING.

It would appear that the job for ID 8e1c2fdd68feee100d8fee003efef3e2, the 
cluster somehow received the submission request twice? The client log only show 
a single submission for this job:

2021-01-20 14:01:55,775 [ProductHistory-18359] INFO  RestClusterClient - 
Submitting job 8e1c2fdd68feee100d8fee003efef3e2 (detached: false).

So while the job is submitted a single time, the dispatcher somehow tries to 
perform two submissions resulting in a failure. How does this happen?



Andreas Hailu
Data Lake Engineering | Goldman Sachs & Co.




Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices


Re: Kubernetes HA Services - artifact for KubernetesHaServicesFactory

2021-01-20 Thread Ashish Nigam
I did move to 1.12.1 version and also ensured that docker has kubernetes
jar file in the right location, i.e.
/opt/flink/plugins/s3-fs-presto/flink-kubernetes_2.12-1.12.1.jar

But job manager is still not able to find the class

2021-01-21 00:00:49,376 ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Could not
start cluster entrypoint StandaloneApplicationClusterEntryPoint.
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
initialize the cluster entrypoint StandaloneApplicationClusterEntryPoint.
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:520)
[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.main(StandaloneApplicationClusterEntryPoint.java:94)
[flink-dist_2.12-1.11.2.jar:1.11.2]
Caused by: org.apache.flink.util.FlinkException: Could not instantiate
class
'org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory'
of type
'org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory'.
Please make sure that this class is on your class path.
at
org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:352)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.loadCustomHighAvailabilityServicesFactory(HighAvailabilityServicesUtils.java:263)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:246)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:126)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:306)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:269)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:211)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:172)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:171)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
... 2 more
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
at jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source) ~[?:?]
at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown
Source) ~[?:?]
at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]

On Mon, Jan 18, 2021 at 7:52 PM Yang Wang  wrote:

> Class
> "org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory"
> is introduced from 1.12.0 version.
> You could try with the latest version 1.12.1[1].
>
> Will that jar file need to be copied under /opt/flink/plugins/s3-fs-presto
>> folder?
>
> Yes, you need to copy the s3 fs implementation to plugin directory.
> An alternative is you could set the environment to enable the plugin[1].
>
> [1].
> https://hub.docker.com/r/apache/flink/tags?page=1&ordering=last_updated&name=1.12.1
> [2].
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/standalone/docker.html#using-filesystem-plugins
>
> Best,
> Yang
>
> Ashish Nigam  于2021年1月18日周一 下午11:15写道:
>
>> Hi,
>> I am not able to identify maven artifact that will have
>> implementation for this class
>>
>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>
>>
>> I am using info in this link to test out HA implementation
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html
>>
>> Please point me to the right artifact. Also, I plan to use S3 bucket as
>> storageDir. So, will that jar file need to be copied under
>> /opt/flink/plugins/s3-fs-presto folder?
>>
>> Thanks
>> Ashish
>>
>


Re: Converting non-mini-batch to mini-batch from checkpoint or savepoint

2021-01-20 Thread Rex Fenley
Just tested this and I couldn't restore from a savepoint. If I do a new job
from scratch, can I tune the minibatch parameters and restore from a
savepoint without having to make yet another brand new job?

Thanks


On Wed, Jan 20, 2021 at 12:43 PM Rex Fenley  wrote:

> Hello,
>
> Is it safe to convert a non-mini-batch job to a mini-batch job when
> restoring from a checkpoint or a savepoint?
>
> Thanks
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: Kubernetes HA Services - artifact for KubernetesHaServicesFactory

2021-01-20 Thread Yang Wang
You do not need to put flink-kubernetes_2.12-1.12.1.jar under the plugin
directory. Only the S3 fs jar
needs to be put there.

I think I found the root cause. It seems your flink-dist is still 1.11.
Do you want to use the image 1.11 and enable the K8s HA at the same time?
I think it could not work. Because the HA related codes have been
refactored from release 1.12.
Even though you copy the flink-kubernetes_2.12-1.12.1.jar to /opt/flink/lib
and make the KubernetesHaServicesFactory class
could be resolved. It will encounter other issues after then.

Could you please try with the latest Flink 1.12 image?
docker pull apache/flink:1.12.1

Best,
Yang

Ashish Nigam  于2021年1月21日周四 上午8:05写道:

> I did move to 1.12.1 version and also ensured that docker has kubernetes
> jar file in the right location, i.e.
> /opt/flink/plugins/s3-fs-presto/flink-kubernetes_2.12-1.12.1.jar
>
> But job manager is still not able to find the class
>
> 2021-01-21 00:00:49,376 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Could not
> start cluster entrypoint StandaloneApplicationClusterEntryPoint.
> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
> initialize the cluster entrypoint StandaloneApplicationClusterEntryPoint.
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:520)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.main(StandaloneApplicationClusterEntryPoint.java:94)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
> Caused by: org.apache.flink.util.FlinkException: Could not instantiate
> class
> 'org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory'
> of type
> 'org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory'.
> Please make sure that this class is on your class path.
> at
> org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:352)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.loadCustomHighAvailabilityServicesFactory(HighAvailabilityServicesUtils.java:263)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:246)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:126)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:306)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:269)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:211)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:172)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:171)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> ... 2 more
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> at jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source) ~[?:?]
> at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown
> Source) ~[?:?]
> at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
>
> On Mon, Jan 18, 2021 at 7:52 PM Yang Wang  wrote:
>
>> Class
>> "org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory"
>> is introduced from 1.12.0 version.
>> You could try with the latest version 1.12.1[1].
>>
>> Will that jar file need to be copied under
>>> /opt/flink/plugins/s3-fs-presto folder?
>>
>> Yes, you need to copy the s3 fs implementation to plugin directory.
>> An alternative is you could set the environment to enable the plugin[1].
>>
>> [1].
>> https://hub.docker.com/r/apache/flink/tags?page=1&ordering=last_updated&name=1.12.1
>> [2].
>> https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/standalone/docker.html#using-filesystem-plugins
>>
>> Best,
>> Yang
>>
>> Ashish Nigam  于2021年1月18日周一 下午11:15写道:
>>
>>> Hi,
>>> I am not able to identify maven artifact that will have
>>> implementation for this class
>>>
>>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>>
>>>
>>> I am using info in this link to test out HA implementation
>

Re: Flink Jobmanager HA deployment on k8s

2021-01-20 Thread Yang Wang
Hi Chirag Dewan,

Yes, we could have multiple replicas with ZK HA in K8 as well. Multiple
JobManagers will contend for
a leader and then write its rpc address to the ZooKeeper nodes. You could
find more information how the
HA service works here[1]. It is about the KubernetesHAService, but the
ZooKeeperHAService has the same
mechanism.

In such a case, I strongly suggest not using the service as the JobManager
rpc address. Otherwise, we
will have the issue you have mentioned. There are 3 replicas behind the
same service endpoint and only
one of them is the leader. TaskManager/Client do not know how to contact
the leader.

Instead, I suggest not creating the internal service and bind the pod ip to
the JobManager rpc address.
After then, TaskManager/Client will retrieve the leader address(pod ip +
port) and contact via such an address.

Please find more information and the example here[1].

[1].
https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
[2].
https://issues.apache.org/jira/browse/FLINK-20982?focusedCommentId=17265715&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17265715

Best,
Yang


Amit Bhatia  于2021年1月20日周三 下午12:27写道:

>   Hi Yang,
>
> I tried the deployment of flink with three replicas of Jobmanger to test a
> faster job recovery scenario.  Below is my deployment :
>
>  $ kubectl get po -namit | grep zk
> eric-data-coordinator-zk-01/1
> Running0  6d21h
> eric-data-coordinator-zk-11/1
> Running0  6d21h
> eric-data-coordinator-zk-21/1
> Running0  6d21h
> flink-jobmanager-ha-zk-1-5d58dc469-8bjpb  1/1
> Running0  19h
> flink-jobmanager-ha-zk-1-5d58dc469-klg5p  1/1
> Running0  19h
> flink-jobmanager-ha-zk-1-5d58dc469-kvwzk  1/1
> Running0  19h
>
>
>  $ kubectl get svc -namit | grep zk
> flink-jobmanager-ha-rest-zk1NodePort   10.100.118.186
>   8081:32115/TCP 21h
> flink-jobmanager-ha-zk1 ClusterIP  10.111.135.174
>   6123/TCP,6124/TCP,8081/TCP 21h
> eric-data-coordinator-zkClusterIP  10.105.139.167
>   2181/TCP,8080/TCP,21007/TCP7d20h
> eric-data-coordinator-zk-ensemble-service   ClusterIP  None
>   2888/TCP,3888/TCP  7d20h
>
> Flink Configmap:
> 
> apiVersion: v1
> kind: ConfigMap
> metadata:
>   name: flink-config-ha-zk-1
>   namespace: amit
>   labels:
> app: flink
> data:
>   flink-conf.yaml: |+
> jobmanager.rpc.address: flink-jobmanager-ha-zk1
> taskmanager.numberOfTaskSlots: 2
> blob.server.port: 6124
> jobmanager.rpc.port: 6123
> taskmanager.rpc.port: 6122
> queryable-state.proxy.ports: 6125
> jobmanager.memory.process.size: 1600m
> taskmanager.memory.process.size: 1728m
> parallelism.default: 2
> # High Availability parameters
> high-availability: zookeeper
> high-availability.cluster-id: /haclusterzk1
> high-availability.storageDir: file:///opt/flink/recovery/
> high-availability.zookeeper.path.root: /flinkhazk
> high-availability.zookeeper.quorum: eric-data-coordinator-zk:2181
> high-availability.jobmanager.port: 6123
> ===
>
> Out of the three replicas of Job manager pods in one of the pod i am
> getting this error:
>
> 2021-01-19 08:18:33,982 INFO
>  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService []
> - Starting ZooKeeperLeaderElectionService
> ZooKeeperLeaderElectionService{leaderPath='/leader/resource_manager_lock'}.
> 2021-01-19 08:21:39,381 WARN
>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
> - Error while retrieving the leader gateway. Retrying to connect to
> akka.tcp://flink@flink-jobmanager-ha-zk1:6123/user/rpc/dispatcher_1.
> 2021-01-19 08:21:42,521 WARN
>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
> - Error while retrieving the leader gateway. Retrying to connect to
> akka.tcp://flink@flink-jobmanager-ha-zk1:6123/user/rpc/dispatcher_1.
> 2021-01-19 08:21:45,508 WARN
>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
> - Error while retrieving the leader gateway. Retrying to connect to
> akka.tcp://flink@flink-jobmanager-ha-zk1:6123/user/rpc/dispatcher_1.
> 2021-01-19 08:21:46,369 WARN
>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
> - Error while retrieving the leader gateway. Retrying to connect to
> akka.tcp://flink@flink-jobmanager-ha-zk1:6123/user/rpc/dispatcher_1.
> 2021-01-19 08:22:13,658 WARN
>  org.apache.

Re: Flink Jobmanager HA deployment on k8s

2021-01-20 Thread Yang Wang
Hi Amit Bhatia

> What is the correct way to start three jobmanager replicas with zk? Is
there any link which explains this deployment scenario and configuration?
Please find more information in the last mail. Unfortunately, we do not
have some documentation to guide the users how to achieve that.

> How we'll identify that out of three replicas, which Job Manager replica
is the leader?
Just like what I have said, using a K8s service for the jobmanager rpc
address is not a good practice.
TaskManager/Client could not know which replica is the leader. Instead, we
should bind the rpc address
to pod ip. After then, TaskManager/Client could find the leader address(pod
ip) via ZooKeeper.

Could you please update your yaml files and deploy again? I think you will
have different results then.

Best,
Yang

Yang Wang  于2021年1月21日周四 上午11:59写道:

> Hi Chirag Dewan,
>
> Yes, we could have multiple replicas with ZK HA in K8 as well. Multiple
> JobManagers will contend for
> a leader and then write its rpc address to the ZooKeeper nodes. You could
> find more information how the
> HA service works here[1]. It is about the KubernetesHAService, but the
> ZooKeeperHAService has the same
> mechanism.
>
> In such a case, I strongly suggest not using the service as the JobManager
> rpc address. Otherwise, we
> will have the issue you have mentioned. There are 3 replicas behind the
> same service endpoint and only
> one of them is the leader. TaskManager/Client do not know how to contact
> the leader.
>
> Instead, I suggest not creating the internal service and bind the pod ip
> to the JobManager rpc address.
> After then, TaskManager/Client will retrieve the leader address(pod ip +
> port) and contact via such an address.
>
> Please find more information and the example here[1].
>
> [1].
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
> [2].
> https://issues.apache.org/jira/browse/FLINK-20982?focusedCommentId=17265715&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17265715
>
> Best,
> Yang
>
>
> Amit Bhatia  于2021年1月20日周三 下午12:27写道:
>
>>   Hi Yang,
>>
>> I tried the deployment of flink with three replicas of Jobmanger to test
>> a faster job recovery scenario.  Below is my deployment :
>>
>>  $ kubectl get po -namit | grep zk
>> eric-data-coordinator-zk-01/1
>> Running0  6d21h
>> eric-data-coordinator-zk-11/1
>> Running0  6d21h
>> eric-data-coordinator-zk-21/1
>> Running0  6d21h
>> flink-jobmanager-ha-zk-1-5d58dc469-8bjpb  1/1
>> Running0  19h
>> flink-jobmanager-ha-zk-1-5d58dc469-klg5p  1/1
>> Running0  19h
>> flink-jobmanager-ha-zk-1-5d58dc469-kvwzk  1/1
>> Running0  19h
>>
>>
>>  $ kubectl get svc -namit | grep zk
>> flink-jobmanager-ha-rest-zk1NodePort   10.100.118.186
>>   8081:32115/TCP 21h
>> flink-jobmanager-ha-zk1 ClusterIP  10.111.135.174
>>   6123/TCP,6124/TCP,8081/TCP 21h
>> eric-data-coordinator-zkClusterIP  10.105.139.167
>>   2181/TCP,8080/TCP,21007/TCP7d20h
>> eric-data-coordinator-zk-ensemble-service   ClusterIP  None
>>   2888/TCP,3888/TCP  7d20h
>>
>> Flink Configmap:
>> 
>> apiVersion: v1
>> kind: ConfigMap
>> metadata:
>>   name: flink-config-ha-zk-1
>>   namespace: amit
>>   labels:
>> app: flink
>> data:
>>   flink-conf.yaml: |+
>> jobmanager.rpc.address: flink-jobmanager-ha-zk1
>> taskmanager.numberOfTaskSlots: 2
>> blob.server.port: 6124
>> jobmanager.rpc.port: 6123
>> taskmanager.rpc.port: 6122
>> queryable-state.proxy.ports: 6125
>> jobmanager.memory.process.size: 1600m
>> taskmanager.memory.process.size: 1728m
>> parallelism.default: 2
>> # High Availability parameters
>> high-availability: zookeeper
>> high-availability.cluster-id: /haclusterzk1
>> high-availability.storageDir: file:///opt/flink/recovery/
>> high-availability.zookeeper.path.root: /flinkhazk
>> high-availability.zookeeper.quorum: eric-data-coordinator-zk:2181
>> high-availability.jobmanager.port: 6123
>> ===
>>
>> Out of the three replicas of Job manager pods in one of the pod i am
>> getting this error:
>>
>> 2021-01-19 08:18:33,982 INFO
>>  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService []
>> - Starting ZooKeeperLeaderElectionService
>> ZooKeeperLeaderElectionService{leaderPath='/leader/resource_manager_lock'}.
>> 2021-01-19 08:21:39,381 WARN
>>  org.apache.flink.runtime.webmoni

Re: [DISCUSS] Correct time-related function behavior in Flink SQL

2021-01-20 Thread Kurt Young
cc this to user & user-zh mailing list because this will affect lots of
users, and also quite a lot of users
were asking questions around this topic.

Let me try to understand this from user's perspective.

Your proposal will affect five functions, which are:

   - PROCTIME()
   - NOW()
   - CURRENT_DATE
   - CURRENT_TIME
   - CURRENT_TIMESTAMP

Before the changes, as I am writing this reply, the local time here is
*2021-01-21
12:03:35 (Beijing time, UTC+8)*.
And I tried these 5 functions in sql client, and got:

*Flink SQL> select now(), PROCTIME(), CURRENT_TIMESTAMP, CURRENT_DATE,
CURRENT_TIME;*

*+-+-+-+--+--+*

*|  EXPR$0 |  EXPR$1 |
CURRENT_TIMESTAMP | CURRENT_DATE | CURRENT_TIME |*

*+-+-+-+--+--+*

*| 2021-01-21T04:03:35.228 | 2021-01-21T04:03:35.228 |
2021-01-21T04:03:35.228 |   2021-01-21 | 04:03:35.228 |*

*+-+-+-+--+--+*
After the changes, the expected behavior will change to:

*Flink SQL> select now(), PROCTIME(), CURRENT_TIMESTAMP, CURRENT_DATE,
CURRENT_TIME;*

*+-+-+-+--+--+*

*|  EXPR$0 |  EXPR$1 |
CURRENT_TIMESTAMP | CURRENT_DATE | CURRENT_TIME |*

*+-+-+-+--+--+*

*| 2021-01-21T12:03:35.228 | 2021-01-21T12:03:35.228 |
2021-01-21T12:03:35.228 |   2021-01-21 | 12:03:35.228 |*

*+-+-+-+--+--+*
The return type of now(), proctime() and CURRENT_TIMESTAMP still be
TIMESTAMP;

Best,
Kurt


On Tue, Jan 19, 2021 at 6:42 PM Leonard Xu  wrote:

> I found above example format may mess up in different mail client, I post
> a picture here[1].
>
> Best,
> Leonard
>
> [1]
> https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/pictures/CURRRENT_TIMESTAMP.png
> <
> https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/pictures/CURRRENT_TIMESTAMP.png>
>
>
> > 在 2021年1月19日,16:22,Leonard Xu  写道:
> >
> > Hi, all
> >
> > I want to start the discussion about correcting time-related function
> behavior in Flink SQL, this is a tricky topic but I think it’s time to
> address it.
> >
> > Currently some temporal function behaviors are wired to users.
> > 1.  When users use a PROCTIME() in SQL, the value of PROCTIME() has a
> timezone offset with the wall-clock time in users' local time zone, users
> need to add their local time zone offset manually to get expected local
> timestamp(e.g: Users in Germany need to +1h to get expected local
> timestamp).
> >
> > 2. Users can not use CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP  to get
> wall-clock timestamp in local time zone, and thus they need write UDF in
> their SQL just for implementing a simple filter like WHERE date_col =
> CURRENT_DATE.
> >
> > 3. Another common case  is the time window  with day interval based on
> PROCTIME(), user plan to put all data from one day into the same window,
> but the window is assigned using timestamp in UTC+0 timezone rather than
> the session timezone which leads to the window starts with an offset(e.g:
> Users in China need to add -8h in their business sql start and then +8h
> when output the result, the conversion like a magic for users).
> >
> > These problems come from that lots of time-related functions like
> PROCTIME(), NOW(), CURRENT_DATE, CURRENT_TIME and CURRENT_TIMESTAMP are
> returning time values based on UTC+0 time zone.
> >
> > This topic will lead to a comparison of the three types, i.e.
> TIMESTAMP/TIMESTAMP WITHOUT TIME ZONE, TIMESTAMP WITH LOCAL TIME ZONE and
> TIMESTAMP WITH TIME ZONE. In order to better understand the three types, I
> wrote a document[1] to help understand them better. You can also know the
> tree timestamp types behavior in Hadoop ecosystem from the reference link
> int the doc.
> >
> >
> > I Invested all Flink time-related functions current behavior and
> compared with other DB vendors like Pg,Presto, Hive, Spark, Snowflake,  I
> made an excel [2] to organize them well, we can use it for the next
> discussion. Please let me know if I missed something.
> > From my investigation, I think we need to correct the behavior of
> function NOW()/PROCTIME()/CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP, to
> correct them, we can change the function return type or function return
> value or change return type and return value both. All of those way are
> valid because SQL:2011 does not specify the function return type and every
> SQL engine vendor has its own implementation. For example the
> CURRENT_TIMESTAMP function,

A few questions about minibatch

2021-01-20 Thread Rex Fenley
Hi,

Our job was experiencing high write amplification on aggregates so we
decided to give mini-batch a go. There's a few things I've noticed that are
different from our previous job and I would like some clarification.

1) Our operators now say they have Watermarks. We never explicitly added
watermarks, and our state is essentially unbounded across all time since it
consumes from Debezium and reshapes our database data into another store.
Why does it say we have Watermarks then?

2) In our sources I see MiniBatchAssigner(interval=[1000ms],
mode=[ProcTime], what does that do?

3) I don't really see anything else different yet in the shape of our plan
even though we've turned on
configuration.setString(
"table.optimizer.agg-phase-strategy",
"TWO_PHASE"
)
is there a way to check that this optimization is on? We use user defined
aggregate functions, does it work for UDAF?

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: Handling validations/errors in the flink job

2021-01-20 Thread sagar
Hi Team, any answer for my below question?

On Wed, Jan 20, 2021 at 9:20 PM sagar  wrote:

> Hi Team,
>
>
> I am creating a flink job with DataStream API and batch mode.
>
> It is having 5 different bounded sources and I need to perform some
> business operations on it like joining , aggregating etc.
>
>
>
> I am using a CoGroup operator to join two streams as it serves as a left
> join. So when keys are present in both the stream, I am processing and
> moving ahead.
>
> But when there is only one key present I need to send it as an error.
>
>
>
> Some operators like Process have side output features, but CoGroup doesn't
> have that feature.
>
>
>
> In order to report missing data to different stream, I am planning to
> create one common error handling stream and at each CoGroup operation I am
> planning to write it to error stream by using Split operator after CoGroup
>
>
>
> Let me know if that is the correct way of handling the errors?
>
> --
> ---Regards---
>
>   Sagar Bandal
>
> This is confidential mail ,All Rights are Reserved.If you are not the
> intended recipient please ignore this email.
>


-- 
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended
receipiant please ignore this email.


Re: [DISCUSS] Correct time-related function behavior in Flink SQL

2021-01-20 Thread Jark Wu
Great examples to understand the problem and the proposed changes, @Kurt!

Thanks Leonard for investigating this problem.
The time-zone problems around time functions and windows have bothered a
lot of users. It's time to fix them!

The return value changes sound reasonable to me, and keeping the return
type unchanged will minimize the surprise to the users.
Besides that, I think it would be better to mention how this affects the
window behaviors, and the interoperability with DataStream.

I think this definitely deserves a FLIP.



Hi zhisheng,

Do you have examples to illustrate which case will get the wrong window
boundaries?
That will help to verify whether the proposed changes can solve your
problem.

Best,
Jark


On Thu, 21 Jan 2021 at 12:54, zhisheng <173855...@qq.com> wrote:

> Thanks to Leonard Xu for discussing this tricky topic. At present, there
> are many Flink jobs in our production environment that are used to count
> day-level reports (eg: count PV/UV ). 
>
>
> If use the default Flink SQL,  the window time range of the
> statistics is incorrect, then the statistical results will naturally be
> incorrect. 
>
>
> The user needs to deal with the time zone manually in order to solve the
> problem. 
>
>
> If Flink itself can solve these time zone issues, then I think it will be
> user-friendly.
>
>
> Thank you
>
>
> Best! 
> zhisheng
>
>
> -- 原始邮件 --
> 发件人:
>   "dev"
> <
> xbjt...@gmail.com>;
> 发送时间: 2021年1月19日(星期二) 晚上6:35
> 收件人: "dev"
> 主题: Re: [DISCUSS] Correct time-related function behavior in Flink SQL
>
>
>
> I found above example format may mess up in different mail client, I post
> a picture here[1].
>
> Best,
> Leonard
>
> [1]
> https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/pictures/CURRRENT_TIMESTAMP.png
> <
> https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/pictures/CURRRENT_TIMESTAMP.png>;
>
>
> > 在 2021年1月19日,16:22,Leonard Xu  >
> > Hi, all
> >
> > I want to start the discussion about correcting time-related function
> behavior in Flink SQL, this is a tricky topic but I think it’s time to
> address it.
> >
> > Currently some temporal function behaviors are wired to users.
> > 1.  When users use a PROCTIME() in SQL, the value of PROCTIME()
> has a timezone offset with the wall-clock time in users' local time zone,
> users need to add their local time zone offset manually to get expected
> local timestamp(e.g: Users in Germany need to +1h to get expected local
> timestamp).
> >
> > 2. Users can not use
> CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP  to get wall-clock
> timestamp in local time zone, and thus they need write UDF in their SQL
> just for implementing a simple filter like WHERE date_col = 
> CURRENT_DATE.
> >
> > 3. Another common case  is the time window  with day
> interval based on PROCTIME(), user plan to put all data from one day into
> the same window, but the window is assigned using timestamp in UTC+0
> timezone rather than the session timezone which leads to the window starts
> with an offset(e.g: Users in China need to add -8h in their business sql
> start and then +8h when output the result, the conversion like a magic for
> users).
> >
> > These problems come from that lots of time-related functions like
> PROCTIME(), NOW(), CURRENT_DATE, CURRENT_TIME and CURRENT_TIMESTAMP are
> returning time values based on UTC+0 time zone.
> >
> > This topic will lead to a comparison of the three types, i.e.
> TIMESTAMP/TIMESTAMP WITHOUT TIME ZONE, TIMESTAMP WITH LOCAL TIME ZONE and
> TIMESTAMP WITH TIME ZONE. In order to better understand the three types, I
> wrote a document[1] to help understand them better. You can also know the
> tree timestamp types behavior in Hadoop ecosystem from the reference link
> int the doc.
> >
> >
> > I Invested all Flink time-related functions current behavior and
> compared with other DB vendors like Pg,Presto, Hive, Spark,
> Snowflake,  I made an excel [2] to organize them well, we can use it
> for the next discussion. Please let me know if I missed something.
> > From my investigation, I think we need to correct the behavior of
> function NOW()/PROCTIME()/CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP, to
> correct them, we can change the function return type or function return
> value or change return type and return value both. All of those way are
> valid because SQL:2011 does not specify the function return type and every
> SQL engine vendor has its own implementation. For example the
> CURRENT_TIMESTAMP function,
> >
> > FLINK  current behaviorexisted problem other vendors'
> behavior proposed change
> > CURRENT_TIMESTAMP  CURRENT_TIMESTAMP
> > TIMESTAMP(0) NOT NULL
> >
> > #session timezone: UTC
> > 2020-12-28T23:52:52
> >
> > #session timezone: UTC+8
> > 2020-