Re: Even key distribution workload

2019-07-15 Thread Caizhi Weng
Hi Navneeth,

Is it possible for you to first keyBy something other than user id (for
example, message id), and then aggregate the message of the same user in
the same keyed stream, and finally aggregate all the keyed stream to get a
per-user result?

Navneeth Krishnan  于2019年7月15日周一 下午2:38写道:

> Hi All,
>
> Currently I have a keyBy user and I see uneven load distribution since
> some of the users would have very high load versus some users having very
> few messages. Is there a recommended way to achieve even distribution of
> workload? Has someone else encountered this problem and what was the
> workaround?
>
> Thanks
>


Re: Even key distribution workload

2019-07-15 Thread Biao Liu
Hi Navneeth,

The "keyby" semantics must keep the data under same key into same task. So
basically this data skew issue is caused by your data distribution.
As far as I known, Flink could not handle data skew very well. There is a
proposal about local aggregation which is still under discussion in dev
mailing list. It can alleviate the data skew. But I guess it still need
some time.

As Caizhi mentioned, it's better to do something in user codes as a
workaround solution. For example, redistribute the skew data.


Navneeth Krishnan  于2019年7月15日周一 下午2:38写道:

> Hi All,
>
> Currently I have a keyBy user and I see uneven load distribution since
> some of the users would have very high load versus some users having very
> few messages. Is there a recommended way to achieve even distribution of
> workload? Has someone else encountered this problem and what was the
> workaround?
>
> Thanks
>


Re:State incompatible

2019-07-15 Thread Haibo Sun
Hi,  Avi Levi


I don't think there's any way to solve this problem right now, and Flink 
documentation clearly shows that this is not supported. 


“Trying to restore state, which was previously configured without TTL, using 
TTL enabled descriptor or vice versa will lead to compatibility failure and 
StateMigrationException."


Flink Document: 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html#state-time-to-live-ttl


Best,
Haibo

At 2019-07-14 16:50:19, "Avi Levi"  wrote:

Hi,

I added a ttl to my state 
old version :
 private lazy val stateDescriptor = new ValueStateDescriptor("foo", 
Types.CASE_CLASS[DomainState])


vs the new version 

@transient
private lazy val storeTtl = StateTtlConfig.newBuilder(90)
  .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  .cleanupInRocksdbCompactFilter()
  .build()

  private lazy val stateDescriptor = {
val des = new ValueStateDescriptor("foo", Types.CASE_CLASS[DomainState])
des.enableTimeToLive(storeTtl)
des
  }


BUT when trying to restore from savepoint I am getting this error:


java.lang.RuntimeException: Error while getting state
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
...

Caused by: org.apache.flink.util.StateMigrationException: The new state 
serializer cannot be incompatible.
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:527)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:137)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335)
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 11 more


Do you have any idea how can I resolve it ? 


Best wishes 

Re: State incompatible

2019-07-15 Thread Avi Levi
Thanks Haibo,
bummer ;)

On Mon, Jul 15, 2019 at 12:27 PM Haibo Sun  wrote:

> *This Message originated outside your organization.*
> --
> Hi,  Avi Levi
>
> I don't think there's any way to solve this problem right now, and Flink
> documentation clearly shows that this is not supported.
>
> “Trying to restore state, which was previously configured without TTL,
> using TTL enabled descriptor or vice versa will lead to compatibility
> failure and StateMigrationException."
>
> Flink Document:
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html#state-time-to-live-ttl
> 
>
> Best,
> Haibo
>
> At 2019-07-14 16:50:19, "Avi Levi"  wrote:
>
> Hi,
> I added a ttl to my state
> *old version :*
>  private lazy val stateDescriptor = new ValueStateDescriptor("foo",
> Types.CASE_CLASS[DomainState])
>
> *vs the new version *
>
> @transient
> private lazy val storeTtl = StateTtlConfig.newBuilder(90)
>   .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>   .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>   .cleanupInRocksdbCompactFilter()
>   .build()
>
>   private lazy val stateDescriptor = {
> val des = new ValueStateDescriptor("foo",
> Types.CASE_CLASS[DomainState])
> des.enableTimeToLive(storeTtl)
> des
>   }
>
> *BUT when trying to restore from savepoint I am getting this error:*
>
> java.lang.RuntimeException: Error while getting state
>   at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
>   at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
>   ...
>
>
> Caused by: org.apache.flink.util.StateMigrationException: The new state 
> serializer cannot be incompatible.
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:527)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
>   at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
>   at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:137)
>   at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
>   at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
>   at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
>   at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335)
>   at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
>   at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
>   ... 11 more
>
>
> Do you have any idea how can I resolve it ?
>
>
> Best wishes
>
>


Converting Metrics from a Reporter to a Custom Events mapping

2019-07-15 Thread Vijay Balakrishnan
Hi,
I need to capture the Metrics sent from a Flink app to a Reporter and
transform them to an Events API format I have designed. I have been looking
at the Reporters(
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#list-of-all-variables)
and have used them but what would be a best practice to capture this
metrics data to transform it ?

The folks using the Flink app still want to see their metrics in the Flink
Dashboard using their chosen(not sure yet what they chose-assuming
ConsoleReporter) Reporter. I need to capture those metrics, transform them
to my Events API format and send it to a Kinesis Stream.

We use Prometheus and InfluxDB in our environments for other purposes.

Should I use the SLF4J Reporter to dump the metrics into a log file/folder
and watch that with a Kinesis Agent and transform it somehow(?) and then
send it to the Kinesis data stream ?

TIA,


Fwd: Stream to CSV Sink with SQL Distinct Values

2019-07-15 Thread sri hari kali charan Tummala
Hi All,

I am trying to read data from kinesis stream and applying SQL
transformation (distinct) and then tryting to write to CSV sink which is
failinf due to this issue (org.apache.flink.table.api.TableException:
AppendStreamTableSink requires that Table has only insert changes.) , full
code is here (
https://github.com/kali786516/FlinkStreamAndSql/blob/614abfc100f74bd8bb7fadb926d946f16f6ef845/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L112
).

can anyone help me moveforward on this issue?

Full Code:-

// set up the streaming execution environment
val env = StreamExecutionEnvironment.createLocalEnvironment
//env.enableCheckpointing(10)

val tEnv = TableEnvironment.getTableEnvironment(env)

// Get AWS credentials
val credentialsProvider = new DefaultAWSCredentialsProviderChain
val credentials = credentialsProvider.getCredentials

// Configure Flink Kinesis consumer
val consumerConfig = new Properties
consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID,
credentials.getAWSAccessKeyId)
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
credentials.getAWSSecretKey)
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"TRIM_HORIZON")

// Create Kinesis stream
val kinesis = env.addSource(new
FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(),
consumerConfig))

val mapFunction: MapFunction[String, Tuple10[String, String,
String,String,String,String,String,String,String,String]] =
  new MapFunction[String, Tuple10[String, String,
String,String,String,String,String,String,String,String]]() {

override def map(s: String): Tuple10[String, String,
String,String,String,String,String,String,String,String] = {
  val data = new Gson().fromJson(s, classOf[TransactionJsonClass])

  val csvData = data.getCc_num+","+
data.getFirst+","+
data.getLast+","+
data.getTrans_num+","+
data.getTrans_time+","+
data.getCategory+","+
data.getMerchant+","+
data.getAmt+","+
data.getMerch_lat+","+
data.getMerch_long

  //println(csvData)

  val p:Array[String] = csvData.split(",")
  var cc_num:String = p(0)
  var first:String = p(1)
  var last:String = p(2)
  var trans_num:String = p(3)
  var trans_time:String = p(4)
  var category:String = p(5)
  var merchant:String = p(6)
  var amt:String = p(7)
  var merch_lat:String = p(8)
  var merch_long:String = p(9)

  val creationDate: Time = new Time(System.currentTimeMillis())
  return new Tuple10(cc_num, first,
last,trans_num,trans_time,category,merchant,amt,merch_lat,merch_long)
}
  }

val data = kinesis.map(mapFunction)

//data.print()

tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")

val query = "SELECT distinct
cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
FROM transactions where cc_num not in ('cc_num')"
val table = tEnv.sqlQuery(query)

val table1 = table.distinct()

tEnv.registerTable("fromAnotherTable",table1)

table.printSchema()

val csvSink:TableSink[Row]  = new
CsvTableSink("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOutput","~");
val fieldNames:Array[String]  =
Array("cc_num","first_column","last_column","trans_num","trans_time","category_column","merchant_column","amt_column","merch_lat","merch_long")
val fieldTypes:Array[TypeInformation[_]]  = Array(
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING
)

tEnv.registerTableSink("s3csvTargetTransaction", fieldNames,
fieldTypes, csvSink)

tEnv.sqlUpdate("INSERT INTO s3csvTargetTransaction SELECT
cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
from fromAnotherTable")


-- 
Thanks & Regards
Sri Tummala


Re: Flink SQL API: Extra columns added from order by

2019-07-15 Thread Morrisa Brenner
Hi Caizhi and Rong,

Thanks for the responses! It's good to know that this is a known bug -
right now we're just using Flink 1.8 and will work around this, but we look
forward to getting the fixes in the future!

All the best,
Morrisa


On Mon, Jul 15, 2019 at 2:25 AM Caizhi Weng  wrote:

> (Oops, I mistakenly sent my response only to Rong Rong. Sorry Rong...)
>
> Hi Morrisa,
>
> This is due to a bug in the old flink planner. The `createTable(new
> PlannerQueryOperation(relational.rel))` method in flink-table-planner ->
> `TableEnvImpl` -> `sqlQuery` should be `createTable(new
> PlannerQueryOperation(relational.project()))` as the last projection (if
> exists) should be performed. This bug is already fixed in the blink planner
> (flink-table-planner-blink). You can use blink planner and runner instead
> of the legacy planner to solve this problem.
>
> Rong Rong  于2019年7月13日周六 上午7:08写道:
>
>> Hi Morrisa,
>>
>> Can you share more information regarding what type of function
>> "formatDate" is and how did you configure the return type of that function?
>> For the question on the first query If the return type is String, then
>> ASC on a string value should be on alphabetical ordering.
>>
>> However on the third query, if the GROUP BY and ORDER BY are both
>> operating on the same input to your UDF it shouldn't be part of the output
>> columns.
>> This looks like a bug to me.
>>
>> --
>> Rong
>>
>> On Thu, Jul 11, 2019 at 11:45 AM Morrisa Brenner <
>> morrisa.bren...@klaviyo.com> wrote:
>>
>>> Hi Flink folks,
>>>
>>> We have a custom date formatting function that we use to format the
>>> output of columns containing dates. Ideally what we want is to format the
>>> output in the select statement but be able to order by the underlying
>>> datetime (so that and output with formatted dates "February 2019" and
>>> "April 2019" is guaranteed to have the rows sorted in time order rather
>>> than alphabetical order).
>>>
>>> When I go to add the unformatted column to the order by, however, that
>>> gets appended as an extra column to the select statement during the query
>>> planning process within Calcite. (In the order by parsing, it's considering
>>> this a different column from the one in the select statement.) When the
>>> group by column is different in the same way but there's no order by
>>> column, the extra column isn't added. I've included a couple of simple
>>> examples below.
>>>
>>> Is this the intended behavior of the query planner? Does anyone know of
>>> a way around this without needing to change the formatting so that it makes
>>> the output dates correctly sortable?
>>>
>>> Thanks for your help!
>>>
>>> Morrisa
>>>
>>>
>>>
>>> Example query and output with order by using formatted date:
>>>
>>> SELECT
>>>
>>> formatDate(floor(`testTable`.`timestamp` TO MONTH), 'MONTH'),
>>>
>>> sum(`testTable`.`count`)
>>>
>>> FROM `testTable`
>>>
>>> GROUP BY formatDate(floor(`testTable`.`timestamp` TO MONTH), 'MONTH')
>>>
>>> ORDER BY formatDate(floor(`testTable`.`timestamp` TO MONTH), 'MONTH') ASC
>>>
>>> Month
>>>
>>> SUM VALUE
>>>
>>> April 2019
>>>
>>> 1052
>>>
>>> February 2019
>>>
>>> 1
>>>
>>>
>>> Example query and output without order by but group by using unformatted
>>> date:
>>>
>>> SELECT
>>>
>>> formatDate(floor(`testTable`.`timestamp` TO MONTH), 'MONTH'),
>>>
>>> sum(`testTable`.`count`)
>>>
>>> FROM `testTable`
>>>
>>> GROUP BY floor(`testTable`.`timestamp` TO MONTH)
>>>
>>> Month
>>>
>>> SUM VALUE
>>>
>>> February 2019
>>>
>>> 1
>>>
>>> April 2019
>>>
>>> 1052
>>>
>>> We would like to enforce the ordering, so although this output is what
>>> we want, I don't think we can use this solution.
>>>
>>> Example query and output with order by using unformatted date:
>>>
>>> SELECT
>>>
>>> formatDate(floor(`testTable`.`timestamp` TO MONTH), 'MONTH'),
>>>
>>> sum(`testTable`.`count`)
>>>
>>> FROM `testTable`
>>>
>>> GROUP BY floor(`testTable`.`timestamp` TO MONTH)
>>>
>>> ORDER BY floor(`testTable`.`timestamp` TO MONTH) ASC
>>>
>>> Month
>>>
>>> SUM VALUE
>>>
>>> February 2019
>>>
>>> 1
>>>
>>> 2/1/2019 12:00 AM
>>>
>>> April 2019
>>>
>>> 1052
>>>
>>> 4/1/2019 12:00 AM
>>>
>>>
>>> --
>>> Morrisa Brenner
>>> Software Engineer
>>> 225 Franklin St, Boston, MA 02110
>>> klaviyo.com 
>>> [image: Klaviyo Logo]
>>>
>>

-- 
Morrisa Brenner
Software Engineer
225 Franklin St, Boston, MA 02110
klaviyo.com 
[image: Klaviyo Logo]


Fwd: org.apache.flink.table.api.java.StreamTableEnvironment cannot be cast to org.apache.flink.table.api.scala.StreamTableEnvironment

2019-07-15 Thread sri hari kali charan Tummala
Hi ,

I am trying to write flink table to streaming Sink it fails at casting Java
to Scala or Scala to Java, it fails at below step can anyone help me out ?
about this error.


val sink2:SinkFunction[Row] = StreamingFileSink.forRowFormat(new
Path("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/test"),
  new SimpleStringEncoder[Row]("UTF-8")).build()

table.addSink(sink2)


package com.aws.examples.kinesis.consumer.TransactionExample

import java.util.Properties

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.serialization.{SimpleStringEncoder,
SimpleStringSchema}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
import org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants,
ConsumerConfigConstants}
import org.apache.flink.table.api.{Table, TableEnvironment}
import com.google.gson.{Gson, JsonObject}
import org.apache.flink.api.java.tuple.{Tuple10, Tuple3}
import java.sql.{DriverManager, Time}

import com.aws.SchemaJavaClasses.Row1
import org.apache.flink.types.Row
import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.api.java.io.jdbc
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat
import org.apache.flink.table.api.java._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.sinks.TableSink
import com.aws.customSinks.CsvCustomSink
import org.apache.flink.core.fs.Path

import scala.collection.JavaConversions._
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.java.StreamTableEnvironment
import org.apache.flink.streaming.api.datastream.DataStream
import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import com.aws.customSinks.CsvCustomSink
import org.apache.flink.streaming.api.functions.sink.SinkFunction

object KinesisConsumer {

  def main(args: Array[String]): Unit = {

// set up the streaming execution environment
val env = StreamExecutionEnvironment.createLocalEnvironment
//env.enableCheckpointing(10)

val tEnv = TableEnvironment.getTableEnvironment(env)

// Get AWS credentials
val credentialsProvider = new DefaultAWSCredentialsProviderChain
val credentials = credentialsProvider.getCredentials

// Configure Flink Kinesis consumer
val consumerConfig = new Properties
consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID,
credentials.getAWSAccessKeyId)
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
credentials.getAWSSecretKey)
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"TRIM_HORIZON")

// Create Kinesis stream
val kinesis = env.addSource(new
FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(),
consumerConfig))

val mapFunction: MapFunction[String, Tuple10[String, String,
String,String,String,String,String,String,String,String]] =
  new MapFunction[String, Tuple10[String, String,
String,String,String,String,String,String,String,String]]() {

override def map(s: String): Tuple10[String, String,
String,String,String,String,String,String,String,String] = {
  val data = new Gson().fromJson(s, classOf[TransactionJsonClass])

  val csvData = data.getCc_num+","+
data.getFirst+","+
data.getLast+","+
data.getTrans_num+","+
data.getTrans_time+","+
data.getCategory+","+
data.getMerchant+","+
data.getAmt+","+
data.getMerch_lat+","+
data.getMerch_long

  //println(csvData)

  val p:Array[String] = csvData.split(",")
  var cc_num:String = p(0)
  var first:String = p(1)
  var last:String = p(2)
  var trans_num:String = p(3)
  var trans_time:String = p(4)
  var category:String = p(5)
  var merchant:String = p(6)
  var amt:String = p(7)
  var merch_lat:String = p(8)
  var merch_long:String = p(9)

  val creationDate: Time = new Time(System.currentTimeMillis())
  return new Tuple10(cc_num, first,
last,trans_num,trans_time,category,merchant,amt,merch_lat,merch_long)
}
  }

val data = kinesis.map(mapFunction)

//data.print()


tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,mer

Creating a Source function to read data continuously

2019-07-15 Thread Soheil Pourbafrani
Hi,

Extending the "RichInputFormat" class I could create my own MySQL input. I
want to use it for reading data continuously from a table but I observed
that the "RichInputFormat" class read all data and finish the job.

I guess for reading data continuously I need to extend the "SourceFunction"
but I observed that it has only two methods: the run() and the cancel()

So I was wondering is it possible to implement a new class to read data
from MySQL tables continuously? Like what we can do with Kafka connector

Thanks


Automatic deployment of new version of streaming stateful job

2019-07-15 Thread Maxim Parkachov
Hi,

I'm trying to bring my first stateful streaming Flink job to production and
have trouble understanding how to integrate it with CI/CD pipeline. I can
cancel the job with savepoint, but in order to start new version of
application I need to specify savepoint path manually ?

So, basically my question, what is best practice of automatically
restarting or deploying new version of stateful streaming application ?
Every tip is greatly appreciated.

Thanks,
Maxim.


Re: Stream to CSV Sink with SQL Distinct Values

2019-07-15 Thread Caizhi Weng
Hi Kali,

Currently Flink treats all aggregate functions as retractable. As
`distinct` is an aggregate function, it's considered by the planner that it
might update or retract records (although from my perspective it won't...).
Because csv table sink is an append only sink (it's hard to update what has
been written in the middle of a file), the exception you mentioned occurs.

However, you can use `toAppendStream` method to change the retractable
stream to an append only stream. For example,
`tEnv.sqlQuery(query).distinct().toAppendStream[Row]` and then you can get
an append only stream. You can then add csv sink to this stream.

sri hari kali charan Tummala  于2019年7月16日周二
上午3:32写道:

> Hi All,
>
> I am trying to read data from kinesis stream and applying SQL
> transformation (distinct) and then tryting to write to CSV sink which is
> failinf due to this issue (org.apache.flink.table.api.TableException:
> AppendStreamTableSink requires that Table has only insert changes.) , full
> code is here (
> https://github.com/kali786516/FlinkStreamAndSql/blob/614abfc100f74bd8bb7fadb926d946f16f6ef845/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L112
> ).
>
> can anyone help me moveforward on this issue?
>
> Full Code:-
>
> // set up the streaming execution environment
> val env = StreamExecutionEnvironment.createLocalEnvironment
> //env.enableCheckpointing(10)
>
> val tEnv = TableEnvironment.getTableEnvironment(env)
>
> // Get AWS credentials
> val credentialsProvider = new DefaultAWSCredentialsProviderChain
> val credentials = credentialsProvider.getCredentials
>
> // Configure Flink Kinesis consumer
> val consumerConfig = new Properties
> consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
> consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
> credentials.getAWSAccessKeyId)
> consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
> credentials.getAWSSecretKey)
> consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
> "TRIM_HORIZON")
>
> // Create Kinesis stream
> val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions2", 
> new SimpleStringSchema(), consumerConfig))
>
> val mapFunction: MapFunction[String, Tuple10[String, String, 
> String,String,String,String,String,String,String,String]] =
>   new MapFunction[String, Tuple10[String, String, 
> String,String,String,String,String,String,String,String]]() {
>
> override def map(s: String): Tuple10[String, String, 
> String,String,String,String,String,String,String,String] = {
>   val data = new Gson().fromJson(s, classOf[TransactionJsonClass])
>
>   val csvData = data.getCc_num+","+
> data.getFirst+","+
> data.getLast+","+
> data.getTrans_num+","+
> data.getTrans_time+","+
> data.getCategory+","+
> data.getMerchant+","+
> data.getAmt+","+
> data.getMerch_lat+","+
> data.getMerch_long
>
>   //println(csvData)
>
>   val p:Array[String] = csvData.split(",")
>   var cc_num:String = p(0)
>   var first:String = p(1)
>   var last:String = p(2)
>   var trans_num:String = p(3)
>   var trans_time:String = p(4)
>   var category:String = p(5)
>   var merchant:String = p(6)
>   var amt:String = p(7)
>   var merch_lat:String = p(8)
>   var merch_long:String = p(9)
>
>   val creationDate: Time = new Time(System.currentTimeMillis())
>   return new Tuple10(cc_num, first, 
> last,trans_num,trans_time,category,merchant,amt,merch_lat,merch_long)
> }
>   }
>
> val data = kinesis.map(mapFunction)
>
> //data.print()
>
> tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
>
> val query = "SELECT distinct 
> cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
>  FROM transactions where cc_num not in ('cc_num')"
> val table = tEnv.sqlQuery(query)
>
> val table1 = table.distinct()
>
> tEnv.registerTable("fromAnotherTable",table1)
>
> table.printSchema()
>
> val csvSink:TableSink[Row]  = new 
> CsvTableSink("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOutput","~");
> val fieldNames:Array[String]  = 
> Array("cc_num","first_column","last_column","trans_num","trans_time","category_column","merchant_column","amt_column","merch_lat","merch_long")
> val fieldTypes:Array[TypeInformation[_]]  = Array(
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flin

Re: org.apache.flink.table.api.java.StreamTableEnvironment cannot be cast to org.apache.flink.table.api.scala.StreamTableEnvironment

2019-07-15 Thread Caizhi Weng
Hi Kali,

What's the exception thrown or error message hinted when executing the
erroneous step? Please print them here so that we can investigate the
problem.

sri hari kali charan Tummala  于2019年7月16日周二
上午4:49写道:

> Hi ,
>
> I am trying to write flink table to streaming Sink it fails at casting
> Java to Scala or Scala to Java, it fails at below step can anyone help me
> out ? about this error.
>
>
> val sink2:SinkFunction[Row] = StreamingFileSink.forRowFormat(new 
> Path("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/test"),
>   new SimpleStringEncoder[Row]("UTF-8")).build()
>
> table.addSink(sink2)
>
>
> package com.aws.examples.kinesis.consumer.TransactionExample
>
> import java.util.Properties
>
> import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
> import org.apache.flink.api.common.functions.MapFunction
> import org.apache.flink.api.common.serialization.{SimpleStringEncoder, 
> SimpleStringSchema}
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
> import 
> org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants, 
> ConsumerConfigConstants}
> import org.apache.flink.table.api.{Table, TableEnvironment}
> import com.google.gson.{Gson, JsonObject}
> import org.apache.flink.api.java.tuple.{Tuple10, Tuple3}
> import java.sql.{DriverManager, Time}
>
> import com.aws.SchemaJavaClasses.Row1
> import org.apache.flink.types.Row
> import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
> import org.apache.flink.table.api.scala._
> import org.apache.flink.table.sinks.CsvTableSink
> import org.apache.flink.api.java.io.jdbc
> import 
> org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder
> import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat
> import org.apache.flink.table.api.java._
> import org.apache.flink.api.common.typeinfo.TypeInformation
> import org.apache.flink.api.java.DataSet
> import org.apache.flink.table.sinks.TableSink
> import com.aws.customSinks.CsvCustomSink
> import org.apache.flink.core.fs.Path
>
> import scala.collection.JavaConversions._
> import org.apache.flink.table.sources.CsvTableSource
> import org.apache.flink.table.api.Table
> import org.apache.flink.table.api.TableEnvironment
> import org.apache.flink.table.api.java.StreamTableEnvironment
> import org.apache.flink.streaming.api.datastream.DataStream
> import 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
> import com.aws.customSinks.CsvCustomSink
> import org.apache.flink.streaming.api.functions.sink.SinkFunction
>
> object KinesisConsumer {
>
>   def main(args: Array[String]): Unit = {
>
> // set up the streaming execution environment
> val env = StreamExecutionEnvironment.createLocalEnvironment
> //env.enableCheckpointing(10)
>
> val tEnv = TableEnvironment.getTableEnvironment(env)
>
> // Get AWS credentials
> val credentialsProvider = new DefaultAWSCredentialsProviderChain
> val credentials = credentialsProvider.getCredentials
>
> // Configure Flink Kinesis consumer
> val consumerConfig = new Properties
> consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
> consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
> credentials.getAWSAccessKeyId)
> consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
> credentials.getAWSSecretKey)
> consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
> "TRIM_HORIZON")
>
> // Create Kinesis stream
> val kinesis = env.addSource(new 
> FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(), 
> consumerConfig))
>
> val mapFunction: MapFunction[String, Tuple10[String, String, 
> String,String,String,String,String,String,String,String]] =
>   new MapFunction[String, Tuple10[String, String, 
> String,String,String,String,String,String,String,String]]() {
>
> override def map(s: String): Tuple10[String, String, 
> String,String,String,String,String,String,String,String] = {
>   val data = new Gson().fromJson(s, classOf[TransactionJsonClass])
>
>   val csvData = data.getCc_num+","+
> data.getFirst+","+
> data.getLast+","+
> data.getTrans_num+","+
> data.getTrans_time+","+
> data.getCategory+","+
> data.getMerchant+","+
> data.getAmt+","+
> data.getMerch_lat+","+
> data.getMerch_long
>
>   //println(csvData)
>
>   val p:Array[String] = csvData.split(",")
>   var cc_num:String = p(0)
>   var first:String = p(1)
>   var last:String = p(2)
>   var trans_num:String = p(3)
>   var trans_time:String = p(4)
>   var category:String = p(5)
>   var merchant:String = p(6)
>   var amt:String = p(7)
>   var merch_lat:String = p(8)
>   var me

Re: Stream to CSV Sink with SQL Distinct Values

2019-07-15 Thread sri hari kali charan Tummala
Hi Weng,

another issue now (Exception in thread "main"
org.apache.flink.table.api.TableException: Only tables that originate from
Scala DataStreams can be converted to Scala DataStreams.), here is the full
code
https://github.com/kali786516/FlinkStreamAndSql/blob/15e5e60d6c044bc830f5ef2d79c071389e7460d1/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L128
and pom https://github.com/kali786516/FlinkStreamAndSql/blob/master/pom.xml.

Exception in thread "main" org.apache.flink.table.api.TableException: Only
tables that originate from Scala DataStreams can be converted to Scala
DataStreams.
at
org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:100)
at
com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer$.main(KinesisConsumer.scala:126)
at
com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer.main(KinesisConsumer.scala)

tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")

val query = "SELECT distinct
cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
FROM transactions where cc_num not in ('cc_num')"
val table = tEnv.sqlQuery(query)

table.printSchema()

import org.apache.flink.streaming.api.scala._

val test1 = tEnv.sqlQuery(query).distinct().toAppendStream[Row]

test1.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut3")


On Mon, Jul 15, 2019 at 9:52 PM Caizhi Weng  wrote:

> Hi Kali,
>
> Currently Flink treats all aggregate functions as retractable. As
> `distinct` is an aggregate function, it's considered by the planner that it
> might update or retract records (although from my perspective it won't...).
> Because csv table sink is an append only sink (it's hard to update what has
> been written in the middle of a file), the exception you mentioned occurs.
>
> However, you can use `toAppendStream` method to change the retractable
> stream to an append only stream. For example,
> `tEnv.sqlQuery(query).distinct().toAppendStream[Row]` and then you can get
> an append only stream. You can then add csv sink to this stream.
>
> sri hari kali charan Tummala  于2019年7月16日周二
> 上午3:32写道:
>
>> Hi All,
>>
>> I am trying to read data from kinesis stream and applying SQL
>> transformation (distinct) and then tryting to write to CSV sink which is
>> failinf due to this issue (org.apache.flink.table.api.TableException:
>> AppendStreamTableSink requires that Table has only insert changes.) , full
>> code is here (
>> https://github.com/kali786516/FlinkStreamAndSql/blob/614abfc100f74bd8bb7fadb926d946f16f6ef845/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L112
>> ).
>>
>> can anyone help me moveforward on this issue?
>>
>> Full Code:-
>>
>> // set up the streaming execution environment
>> val env = StreamExecutionEnvironment.createLocalEnvironment
>> //env.enableCheckpointing(10)
>>
>> val tEnv = TableEnvironment.getTableEnvironment(env)
>>
>> // Get AWS credentials
>> val credentialsProvider = new DefaultAWSCredentialsProviderChain
>> val credentials = credentialsProvider.getCredentials
>>
>> // Configure Flink Kinesis consumer
>> val consumerConfig = new Properties
>> consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
>> consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
>> credentials.getAWSAccessKeyId)
>> consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
>> credentials.getAWSSecretKey)
>> consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
>> "TRIM_HORIZON")
>>
>> // Create Kinesis stream
>> val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions2", 
>> new SimpleStringSchema(), consumerConfig))
>>
>> val mapFunction: MapFunction[String, Tuple10[String, String, 
>> String,String,String,String,String,String,String,String]] =
>>   new MapFunction[String, Tuple10[String, String, 
>> String,String,String,String,String,String,String,String]]() {
>>
>> override def map(s: String): Tuple10[String, String, 
>> String,String,String,String,String,String,String,String] = {
>>   val data = new Gson().fromJson(s, classOf[TransactionJsonClass])
>>
>>   val csvData = data.getCc_num+","+
>> data.getFirst+","+
>> data.getLast+","+
>> data.getTrans_num+","+
>> data.getTrans_time+","+
>> data.getCategory+","+
>> data.getMerchant+","+
>> data.getAmt+","+
>> data.getMerch_lat+","+
>> data.getMerch_long
>>
>>   //println(csvData)
>>
>>   val p:Array[String] = csvData.split(",")
>>   var cc_num:String = p(0)
>>   var first:String = p(1)
>>   var last:String = p(2)
>>   var trans_num:String = p(3)
>>   var trans_time:String = p(4)
>>   var category:String = p(5)
>>   var merchant:String = p(6)
>>   v

Re: Stream to CSV Sink with SQL Distinct Values

2019-07-15 Thread JingsongLee
Hi caizhi and kali:

I think this table should use toRetractStream instead of toAppendStream, and 
you should handle the retract messages. (If you just use distinct, the message 
should always be accumulate message)

Best, JingsongLee


--
From:Caizhi Weng 
Send Time:2019年7月16日(星期二) 09:52
To:sri hari kali charan Tummala 
Cc:user 
Subject:Re: Stream to CSV Sink with SQL Distinct Values

Hi Kali,

Currently Flink treats all aggregate functions as retractable. As `distinct` is 
an aggregate function, it's considered by the planner that it might update or 
retract records (although from my perspective it won't...). Because csv table 
sink is an append only sink (it's hard to update what has been written in the 
middle of a file), the exception you mentioned occurs.

However, you can use `toAppendStream` method to change the retractable stream 
to an append only stream. For example, 
`tEnv.sqlQuery(query).distinct().toAppendStream[Row]` and then you can get an 
append only stream. You can then add csv sink to this stream.
sri hari kali charan Tummala  于2019年7月16日周二 上午3:32写道:

Hi All, 

I am trying to read data from kinesis stream and applying SQL transformation 
(distinct) and then tryting to write to CSV sink which is failinf due to this 
issue (org.apache.flink.table.api.TableException: AppendStreamTableSink 
requires that Table has only insert changes.) , full code is here 
(https://github.com/kali786516/FlinkStreamAndSql/blob/614abfc100f74bd8bb7fadb926d946f16f6ef845/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L112).

can anyone help me moveforward on this issue?

Full Code:- 
// set up the streaming execution environment
val env = StreamExecutionEnvironment.createLocalEnvironment
//env.enableCheckpointing(10)

val tEnv = TableEnvironment.getTableEnvironment(env)

// Get AWS credentials
val credentialsProvider = new DefaultAWSCredentialsProviderChain
val credentials = credentialsProvider.getCredentials

// Configure Flink Kinesis consumer
val consumerConfig = new Properties
consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
credentials.getAWSAccessKeyId)
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
credentials.getAWSSecretKey)
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
"TRIM_HORIZON")

// Create Kinesis stream
val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions2", new 
SimpleStringSchema(), consumerConfig))

val mapFunction: MapFunction[String, Tuple10[String, String, 
String,String,String,String,String,String,String,String]] =
  new MapFunction[String, Tuple10[String, String, 
String,String,String,String,String,String,String,String]]() {

override def map(s: String): Tuple10[String, String, 
String,String,String,String,String,String,String,String] = {
  val data = new Gson().fromJson(s, classOf[TransactionJsonClass])

  val csvData = data.getCc_num+","+
data.getFirst+","+
data.getLast+","+
data.getTrans_num+","+
data.getTrans_time+","+
data.getCategory+","+
data.getMerchant+","+
data.getAmt+","+
data.getMerch_lat+","+
data.getMerch_long

  //println(csvData)

  val p:Array[String] = csvData.split(",")
  var cc_num:String = p(0)
  var first:String = p(1)
  var last:String = p(2)
  var trans_num:String = p(3)
  var trans_time:String = p(4)
  var category:String = p(5)
  var merchant:String = p(6)
  var amt:String = p(7)
  var merch_lat:String = p(8)
  var merch_long:String = p(9)

  val creationDate: Time = new Time(System.currentTimeMillis())
  return new Tuple10(cc_num, first, 
last,trans_num,trans_time,category,merchant,amt,merch_lat,merch_long)
}
  }

val data = kinesis.map(mapFunction)

//data.print()

tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")

val query = "SELECT distinct 
cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
 FROM transactions where cc_num not in ('cc_num')"
val table = tEnv.sqlQuery(query)

val table1 = table.distinct()

tEnv.registerTable("fromAnotherTable",table1)

table.printSchema()

val csvSink:TableSink[Row]  = new 
CsvTableSink("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOutput","~");
val fieldNames:Array[String]  = 
Array("cc_num","first_column","last_column","trans_num","trans_time","category_column","merchant_column","amt_column","merch_lat","merch_long")
val fieldTypes:Array[TypeInformation[_]]  = Array(
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.fli

Re: Stream to CSV Sink with SQL Distinct Values

2019-07-15 Thread sri hari kali charan Tummala
Hi Lee,

I did try

Option 1:-
it writes to CSV file only if I kill the running job.

tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row])
  
.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut3",
  FileSystem.WriteMode.OVERWRITE,"~","|")

OutPut:-
2>
(true,180094108369013,John,Holland,c1ad7a1b73172ef67bd24820438f3f93,2019-07-15
22:48:40,travel,Satterfield-Lowe,81,39.015861,-119.883595)

Option 2:-
I tried several options thought this workaround is kind of working but I
need to replace brakcets,true etc

import java.io.PrintStream
val fileOut = new
PrintStream("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut2/out.txt")

System.setOut(fileOut)

tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]).print()

System.out.println(tEnv.toRetractStream(table,
classOf[org.apache.flink.types.Row]).print())


On Mon, Jul 15, 2019 at 10:03 PM JingsongLee 
wrote:

> Hi caizhi and kali:
>
> I think this table should use toRetractStream instead of toAppendStream,
> and you should handle the retract messages. (If you just use distinct, the
> message should always be accumulate message)
>
> Best, JingsongLee
>
> --
> From:Caizhi Weng 
> Send Time:2019年7月16日(星期二) 09:52
> To:sri hari kali charan Tummala 
> Cc:user 
> Subject:Re: Stream to CSV Sink with SQL Distinct Values
>
> Hi Kali,
>
> Currently Flink treats all aggregate functions as retractable. As
> `distinct` is an aggregate function, it's considered by the planner that it
> might update or retract records (although from my perspective it won't...).
> Because csv table sink is an append only sink (it's hard to update what has
> been written in the middle of a file), the exception you mentioned occurs.
>
> However, you can use `toAppendStream` method to change the retractable
> stream to an append only stream. For example,
> `tEnv.sqlQuery(query).distinct().toAppendStream[Row]` and then you can get
> an append only stream. You can then add csv sink to this stream.
>
> sri hari kali charan Tummala  于2019年7月16日周二
> 上午3:32写道:
> Hi All,
>
> I am trying to read data from kinesis stream and applying SQL
> transformation (distinct) and then tryting to write to CSV sink which is
> failinf due to this issue (org.apache.flink.table.api.TableException:
> AppendStreamTableSink requires that Table has only insert changes.) , full
> code is here (
> https://github.com/kali786516/FlinkStreamAndSql/blob/614abfc100f74bd8bb7fadb926d946f16f6ef845/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L112
> ).
>
> can anyone help me moveforward on this issue?
>
> Full Code:-
>
> // set up the streaming execution environment
> val env = StreamExecutionEnvironment.createLocalEnvironment
> //env.enableCheckpointing(10)
>
> val tEnv = TableEnvironment.getTableEnvironment(env)
>
> // Get AWS credentials
> val credentialsProvider = new DefaultAWSCredentialsProviderChain
> val credentials = credentialsProvider.getCredentials
>
> // Configure Flink Kinesis consumer
> val consumerConfig = new Properties
> consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
> consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
> credentials.getAWSAccessKeyId)
> consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
> credentials.getAWSSecretKey)
> consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
> "TRIM_HORIZON")
>
> // Create Kinesis stream
> val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions2", 
> new SimpleStringSchema(), consumerConfig))
>
> val mapFunction: MapFunction[String, Tuple10[String, String, 
> String,String,String,String,String,String,String,String]] =
>   new MapFunction[String, Tuple10[String, String, 
> String,String,String,String,String,String,String,String]]() {
>
> override def map(s: String): Tuple10[String, String, 
> String,String,String,String,String,String,String,String] = {
>   val data = new Gson().fromJson(s, classOf[TransactionJsonClass])
>
>   val csvData = data.getCc_num+","+
> data.getFirst+","+
> data.getLast+","+
> data.getTrans_num+","+
> data.getTrans_time+","+
> data.getCategory+","+
> data.getMerchant+","+
> data.getAmt+","+
> data.getMerch_lat+","+
> data.getMerch_long
>
>   //println(csvData)
>
>   val p:Array[String] = csvData.split(",")
>   var cc_num:String = p(0)
>   var first:String = p(1)
>   var last:String = p(2)
>   var trans_num:String = p(3)
>   var trans_time:String = p(4)
>   var category:String = p(5)
>   var merchant:String = p(6)
>   var amt:String = p(7)
>   var merch_lat:String = p(8)
>   var merch_long:String = p(9)
>
>   val creationDate: Time = new Time(System.currentTimeMillis())
>   return new Tuple10(cc_num, first, 
> last,trans_num,trans_time,category,merchant,amt,merch

Re: Creating a Source function to read data continuously

2019-07-15 Thread Caizhi Weng
Hi Soheil,

It's not recommended to implement a streaming source using `InputFormat`
(it's mainly used for batch source). To implement a streaming source,
`SourceFunction` is recommended.

It's clearly written (with examples) in the java docs in `SourceFucntion`
how to write a `run` and `cancel` method. You can refer to that to write
your own MySQL streaming source.

Soheil Pourbafrani  于2019年7月16日周二 上午7:29写道:

> Hi,
>
> Extending the "RichInputFormat" class I could create my own MySQL input.
> I want to use it for reading data continuously from a table but I observed
> that the "RichInputFormat" class read all data and finish the job.
>
> I guess for reading data continuously I need to extend the
> "SourceFunction" but I observed that it has only two methods: the run() and
> the cancel()
>
> So I was wondering is it possible to implement a new class to read data
> from MySQL tables continuously? Like what we can do with Kafka connector
>
> Thanks
>


Re: Stream to CSV Sink with SQL Distinct Values

2019-07-15 Thread sri hari kali charan Tummala
Hi Lee,

it writes only after the job is killed and also I dont see all the records
? is there a workaround?

tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row])
  
.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut4",
FileSystem.WriteMode.NO_OVERWRITE,"~","|")


On Mon, Jul 15, 2019 at 10:03 PM JingsongLee 
wrote:

> Hi caizhi and kali:
>
> I think this table should use toRetractStream instead of toAppendStream,
> and you should handle the retract messages. (If you just use distinct, the
> message should always be accumulate message)
>
> Best, JingsongLee
>
> --
> From:Caizhi Weng 
> Send Time:2019年7月16日(星期二) 09:52
> To:sri hari kali charan Tummala 
> Cc:user 
> Subject:Re: Stream to CSV Sink with SQL Distinct Values
>
> Hi Kali,
>
> Currently Flink treats all aggregate functions as retractable. As
> `distinct` is an aggregate function, it's considered by the planner that it
> might update or retract records (although from my perspective it won't...).
> Because csv table sink is an append only sink (it's hard to update what has
> been written in the middle of a file), the exception you mentioned occurs.
>
> However, you can use `toAppendStream` method to change the retractable
> stream to an append only stream. For example,
> `tEnv.sqlQuery(query).distinct().toAppendStream[Row]` and then you can get
> an append only stream. You can then add csv sink to this stream.
>
> sri hari kali charan Tummala  于2019年7月16日周二
> 上午3:32写道:
> Hi All,
>
> I am trying to read data from kinesis stream and applying SQL
> transformation (distinct) and then tryting to write to CSV sink which is
> failinf due to this issue (org.apache.flink.table.api.TableException:
> AppendStreamTableSink requires that Table has only insert changes.) , full
> code is here (
> https://github.com/kali786516/FlinkStreamAndSql/blob/614abfc100f74bd8bb7fadb926d946f16f6ef845/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L112
> ).
>
> can anyone help me moveforward on this issue?
>
> Full Code:-
>
> // set up the streaming execution environment
> val env = StreamExecutionEnvironment.createLocalEnvironment
> //env.enableCheckpointing(10)
>
> val tEnv = TableEnvironment.getTableEnvironment(env)
>
> // Get AWS credentials
> val credentialsProvider = new DefaultAWSCredentialsProviderChain
> val credentials = credentialsProvider.getCredentials
>
> // Configure Flink Kinesis consumer
> val consumerConfig = new Properties
> consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
> consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
> credentials.getAWSAccessKeyId)
> consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
> credentials.getAWSSecretKey)
> consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
> "TRIM_HORIZON")
>
> // Create Kinesis stream
> val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions2", 
> new SimpleStringSchema(), consumerConfig))
>
> val mapFunction: MapFunction[String, Tuple10[String, String, 
> String,String,String,String,String,String,String,String]] =
>   new MapFunction[String, Tuple10[String, String, 
> String,String,String,String,String,String,String,String]]() {
>
> override def map(s: String): Tuple10[String, String, 
> String,String,String,String,String,String,String,String] = {
>   val data = new Gson().fromJson(s, classOf[TransactionJsonClass])
>
>   val csvData = data.getCc_num+","+
> data.getFirst+","+
> data.getLast+","+
> data.getTrans_num+","+
> data.getTrans_time+","+
> data.getCategory+","+
> data.getMerchant+","+
> data.getAmt+","+
> data.getMerch_lat+","+
> data.getMerch_long
>
>   //println(csvData)
>
>   val p:Array[String] = csvData.split(",")
>   var cc_num:String = p(0)
>   var first:String = p(1)
>   var last:String = p(2)
>   var trans_num:String = p(3)
>   var trans_time:String = p(4)
>   var category:String = p(5)
>   var merchant:String = p(6)
>   var amt:String = p(7)
>   var merch_lat:String = p(8)
>   var merch_long:String = p(9)
>
>   val creationDate: Time = new Time(System.currentTimeMillis())
>   return new Tuple10(cc_num, first, 
> last,trans_num,trans_time,category,merchant,amt,merch_lat,merch_long)
> }
>   }
>
> val data = kinesis.map(mapFunction)
>
> //data.print()
>
> tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
>
> val query = "SELECT distinct 
> cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
>  FROM transactions where cc_num not in ('cc_num')"
> val table = tEnv.sqlQuery(query)
>
> val table1 = table.distinct()
>
> tEnv.registerTable("fromAnotherTable",tab

Re: org.apache.flink.table.api.java.StreamTableEnvironment cannot be cast to org.apache.flink.table.api.scala.StreamTableEnvironment

2019-07-15 Thread sri hari kali charan Tummala
this is the error.

org.apache.flink.table.api.java.StreamTableEnvironment cannot be cast to
org.apache.flink.table.api.scala.StreamTableEnvironment


On Mon, Jul 15, 2019 at 9:54 PM Caizhi Weng  wrote:

> Hi Kali,
>
> What's the exception thrown or error message hinted when executing the
> erroneous step? Please print them here so that we can investigate the
> problem.
>
> sri hari kali charan Tummala  于2019年7月16日周二
> 上午4:49写道:
>
>> Hi ,
>>
>> I am trying to write flink table to streaming Sink it fails at casting
>> Java to Scala or Scala to Java, it fails at below step can anyone help me
>> out ? about this error.
>>
>>
>> val sink2:SinkFunction[Row] = StreamingFileSink.forRowFormat(new 
>> Path("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/test"),
>>   new SimpleStringEncoder[Row]("UTF-8")).build()
>>
>> table.addSink(sink2)
>>
>>
>> package com.aws.examples.kinesis.consumer.TransactionExample
>>
>> import java.util.Properties
>>
>> import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
>> import org.apache.flink.api.common.functions.MapFunction
>> import org.apache.flink.api.common.serialization.{SimpleStringEncoder, 
>> SimpleStringSchema}
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>> import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
>> import 
>> org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants, 
>> ConsumerConfigConstants}
>> import org.apache.flink.table.api.{Table, TableEnvironment}
>> import com.google.gson.{Gson, JsonObject}
>> import org.apache.flink.api.java.tuple.{Tuple10, Tuple3}
>> import java.sql.{DriverManager, Time}
>>
>> import com.aws.SchemaJavaClasses.Row1
>> import org.apache.flink.types.Row
>> import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
>> import org.apache.flink.table.api.scala._
>> import org.apache.flink.table.sinks.CsvTableSink
>> import org.apache.flink.api.java.io.jdbc
>> import 
>> org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder
>> import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat
>> import org.apache.flink.table.api.java._
>> import org.apache.flink.api.common.typeinfo.TypeInformation
>> import org.apache.flink.api.java.DataSet
>> import org.apache.flink.table.sinks.TableSink
>> import com.aws.customSinks.CsvCustomSink
>> import org.apache.flink.core.fs.Path
>>
>> import scala.collection.JavaConversions._
>> import org.apache.flink.table.sources.CsvTableSource
>> import org.apache.flink.table.api.Table
>> import org.apache.flink.table.api.TableEnvironment
>> import org.apache.flink.table.api.java.StreamTableEnvironment
>> import org.apache.flink.streaming.api.datastream.DataStream
>> import 
>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
>> import com.aws.customSinks.CsvCustomSink
>> import org.apache.flink.streaming.api.functions.sink.SinkFunction
>>
>> object KinesisConsumer {
>>
>>   def main(args: Array[String]): Unit = {
>>
>> // set up the streaming execution environment
>> val env = StreamExecutionEnvironment.createLocalEnvironment
>> //env.enableCheckpointing(10)
>>
>> val tEnv = TableEnvironment.getTableEnvironment(env)
>>
>> // Get AWS credentials
>> val credentialsProvider = new DefaultAWSCredentialsProviderChain
>> val credentials = credentialsProvider.getCredentials
>>
>> // Configure Flink Kinesis consumer
>> val consumerConfig = new Properties
>> consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
>> consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
>> credentials.getAWSAccessKeyId)
>> consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
>> credentials.getAWSSecretKey)
>> consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
>> "TRIM_HORIZON")
>>
>> // Create Kinesis stream
>> val kinesis = env.addSource(new 
>> FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(), 
>> consumerConfig))
>>
>> val mapFunction: MapFunction[String, Tuple10[String, String, 
>> String,String,String,String,String,String,String,String]] =
>>   new MapFunction[String, Tuple10[String, String, 
>> String,String,String,String,String,String,String,String]]() {
>>
>> override def map(s: String): Tuple10[String, String, 
>> String,String,String,String,String,String,String,String] = {
>>   val data = new Gson().fromJson(s, classOf[TransactionJsonClass])
>>
>>   val csvData = data.getCc_num+","+
>> data.getFirst+","+
>> data.getLast+","+
>> data.getTrans_num+","+
>> data.getTrans_time+","+
>> data.getCategory+","+
>> data.getMerchant+","+
>> data.getAmt+","+
>> data.getMerch_lat+","+
>> data.getMerch_long
>>
>>   //println(csvData)
>>
>>   val p:Array[String] = csvData.split(",")
>>   var cc_num:Strin

Re:Re: Creating a Source function to read data continuously

2019-07-15 Thread Haibo Sun
Hi, Soheil


As Caizhi said, to create a source that implements `SourceFunction`, you can 
first take a closer look at the example in javadoc 
(https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html).
 Although `InputFormat` is not recommended to implement a streaming source, it 
can achieve continuous data reading. As for finishing the job after reading all 
the data, I think it's your implementation problem. In addition, creating a 
custom source can also implements or extends `RichSourceFunction`, 
`ParallelSourceFunction`, `RichParallelSourceFunction`, etc.


I don't know how you will achieve continuous reading. Maybe you can also look 
at the implementation of `ContinuousFileMonitoringFunction`: 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/Continuous
 File Monitoring Function.java


I hope this will help you.


Best,
Haibo

At 2019-07-16 10:12:21, "Caizhi Weng"  wrote:

Hi Soheil,


It's not recommended to implement a streaming source using `InputFormat` (it's 
mainly used for batch source). To implement a streaming source, 
`SourceFunction` is recommended.


It's clearly written (with examples) in the java docs in `SourceFucntion` how 
to write a `run` and `cancel` method. You can refer to that to write your own 
MySQL streaming source.


Soheil Pourbafrani  于2019年7月16日周二 上午7:29写道:

Hi,


Extending the "RichInputFormat" class I could create my own MySQL input. I want 
to use it for reading data continuously from a table but I observed that the 
"RichInputFormat" class read all data and finish the job.


I guess for reading data continuously I need to extend the "SourceFunction" but 
I observed that it has only two methods: the run() and the cancel()


So I was wondering is it possible to implement a new class to read data from 
MySQL tables continuously? Like what we can do with Kafka connector


Thanks

Re: Creating a Source function to read data continuously

2019-07-15 Thread Biao Liu
Hi Soheil,

I assume that you are using `DataStream` API. Please check the document [1]
to get more information. Other guys said a lot about this.

Regardless the interfaces, I'm just wondering how could you read a Mysql
table "continuously"?
Kafka can be used as a message queue which is convenient to get the
incremental messages. How do you plan to do that based on Mysql table?
Through binary log?

1.
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/datastream_api.html#data-sources


Caizhi Weng  于2019年7月16日周二 上午10:12写道:

> Hi Soheil,
>
> It's not recommended to implement a streaming source using `InputFormat`
> (it's mainly used for batch source). To implement a streaming source,
> `SourceFunction` is recommended.
>
> It's clearly written (with examples) in the java docs in `SourceFucntion`
> how to write a `run` and `cancel` method. You can refer to that to write
> your own MySQL streaming source.
>
> Soheil Pourbafrani  于2019年7月16日周二 上午7:29写道:
>
>> Hi,
>>
>> Extending the "RichInputFormat" class I could create my own MySQL input.
>> I want to use it for reading data continuously from a table but I observed
>> that the "RichInputFormat" class read all data and finish the job.
>>
>> I guess for reading data continuously I need to extend the
>> "SourceFunction" but I observed that it has only two methods: the run() and
>> the cancel()
>>
>> So I was wondering is it possible to implement a new class to read data
>> from MySQL tables continuously? Like what we can do with Kafka connector
>>
>> Thanks
>>
>


Job leak in attached mode (batch scenario)

2019-07-15 Thread qi luo
Hi guys,

We runs thousands of Flink batch job everyday. The batch jobs are submitted in 
attached mode, so we can know from the client when the job finished and then 
take further actions. To respond to user abort actions, we submit the jobs with 
"—shutdownOnAttachedExit” so the Flink cluster can be shutdown when the client 
exits.

However, in some cases when the Flink client exists abnormally (such as OOM), 
the shutdown signal will not be sent to Flink cluster, causing the “job leak”. 
The lingering Flink job will continue to run and never ends, consuming large 
amount of resources and even produce unexpected results.

Does Flink has any mechanism to handle such scenario (e.g. Spark has cluster 
mode, where the driver runs in the client side, so the job will exit when 
client exits)? Any idea will be very appreciated!

Thanks,
Qi