Re: Flink Operator in Golang?

2022-11-17 Thread kant kodali
Golang doesn't seem to have anything similar to Flink or Spark.

On Thu, Nov 17, 2022 at 8:11 PM Mark Lee  wrote:

> I got it, Thanks Zhanghao!
>
>
>
> *发件人:* user-return-51640-lifuqiong00=126@flink.apache.org
>  *代表 *
> zhanghao.c...@outlook.com
> *发送时间:* 2022年11月17日 23:36
> *收件人:* Mark Lee ; user@flink.apache.org
> *主题:* Re: Flink Operator in Golang?
>
>
>
> Hi Mark,
>
>
>
>1. Directly quoting from
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-212%3A+Introduce+Flink+Kubernetes+Operator
>:
>
> Main reasons for choosing Java over Go
>
> ·   Direct access to Flink Client libraries for submitting, managing
> jobs and handling errors
>
> ·   Most Flink developers have strong Java experience while there are
> only few Go experts
>
> ·   Easier to integrate with existing build system and tooling
>
> ·   Required k8s clients and tools for building an operator are also
> available in Java
>
>
>1. unfortunately, Golang API is not supported yet
>
>
>
>
>
> Best,
>
> Zhanghao Chen
> --
>
> *From:* Mark Lee 
> *Sent:* Thursday, November 17, 2022 16:16
> *To:* user@flink.apache.org 
> *Subject:* Flink Operator in Golang?
>
>
>
> Hi,
>
>   I found we already have Flink operator implemented by java. But I have
> two questions:
>
> 1. If we can implement Flink operator using golang? Is there some
> hidden difficult traps?
>
> 2. We can submit Java jar jobs or sql jobs, can we submit golang jobs?
>
>
>
> Thank you.
>
>
>


Re: what is the difference between map vs process on a datastream?

2020-03-17 Thread kant kodali
Got it! and thanks a lot for that. So there is no difference between
flatmap and process then?

On Tue, Mar 17, 2020 at 5:29 AM David Anderson  wrote:

> Map applies a MapFunction (or a RichMapFunction) to a DataStream and does
> a one-to-one transformation of the stream elements.
>
> Process applies a ProcessFunction, which can produce zero, one, or many
> events in response to each event. And when used on a keyed stream, a
> KeyedProcessFunction can use Timers to defer actions until later, based
> either on watermarks or the time-of-day clock. A ProcessFunction can also
> have side outputs.
>
> Both RichMapFunctions and KeyedProcessFunctions can use keyed state.
>
> Process is strictly more powerful -- there's nothing you can do with map
> that you couldn't do with process instead. The same is true for flatmap,
> which is similar to map, but with a Collector that can be used to
> emit zero, one, or many events in response to each event, just like a
> process function.
>
> David
>
>
> On Tue, Mar 17, 2020 at 11:50 AM kant kodali  wrote:
>
>> what is the difference between map vs process on a datastream? they look
>> very similar.
>>
>> Thanks!
>>
>>


Re: Apache Airflow - Question about checkpointing and re-run a job

2020-03-17 Thread kant kodali
Does Airflow has a Flink Operator? I am not seeing it? Can you please point
me?

On Mon, Nov 18, 2019 at 3:10 AM M Singh  wrote:

> Thanks Congxian for your answer and reference.  Mans
>
> On Sunday, November 17, 2019, 08:59:16 PM EST, Congxian Qiu <
> qcx978132...@gmail.com> wrote:
>
>
> Hi
> Yes, checkpoint data locates under jobid dir. you can try to restore from
> the retained checkpoint[1]
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
> Best,
> Congxian
>
>
> M Singh  于2019年11月18日周一 上午2:54写道:
>
> Folks - Please let me know if you have any advice on this question.  Thanks
>
> On Saturday, November 16, 2019, 02:39:18 PM EST, M Singh <
> mans2si...@yahoo.com> wrote:
>
>
> Hi:
>
> I have a Flink job and sometimes I need to cancel and re run it.  From
> what I understand the checkpoints for a job are saved under the job id
> directory at the checkpoint location. If I run the same job again, it will
> get a new job id and the checkpoint saved from the previous run job (which
> is saved under the previous job's id dir) will not be used for this new
> run. Is that a correct understanding ?  If I need to re-run the job from
> the previous checkpoint - is there any way to do that automatically without
> using a savepoint ?
>
> Also, I believe the internal job restarts do not change the job id so in
> those cases where the job restarts will pick the state from the saved
> checkpoint.  Is my understanding correct ?
>
> Thanks
>
> Mans
>
>


what is the difference between map vs process on a datastream?

2020-03-17 Thread kant kodali
what is the difference between map vs process on a datastream? they look
very similar.

Thanks!


a question on window trigger and delta output

2020-03-15 Thread kant kodali
Hi All,

I set a transformation like this and my events in the stream have a
sequential timestamp like 1,2,3, and I set the watermark to event time.

   myStream
   .keyBy(0)
   .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS))
   .aggregate(new myAggregateFunction())
   .print()

This stream prints only when there are events that are multiples of 1000.
This is not quite what I want because I want to see some output every
second. so I tried this

myStream
.map(new PartitionMapper<>()).returns(typeInfo)
.keyBy(0)
.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))

This looked much better however it prints the entire state every second,
which makes sense because thats what ContinuousProcessingTimeTrigger is
supposed to do. But what I am looking for is combination of both like
Trigger only if there are some events in the window with a max timeout of 1
second. if there are no events in the window then don't trigger because I
don't want to see the same output every second.

What would be the right trigger here (something that doesn't wait until the
watermark event or doesn't print the same output every second)?

Also. I don't want to output the whole state in the transformation above
instead I only want to output the delta from previous trigger to current
trigger. I looked into DeltaTriggers and Delta evictors and confused by
what the threshold parameter will do and also looking for some explanation
on what they would do?

Other words, I want to output something similar to retractStream() when an
outer join between two tables is executed since it outputs only delta and
only when there are some events (doesn't wait until the watermark event or
doesn't print the same output every second)

Thanks!


Re: Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

2020-03-10 Thread kant kodali
>> how to generate *watermarks*.
>>>
>>> Internally, *ingestion time* is treated much like *event time*, but
>>> with automatic timestamp assignment and automatic watermark generation.
>>>
>>
>> So it's neither possible to assign timestamps nor watermark, but it seems
>> as if the default behavior is exactly as you want it to be. If that doesn't
>> work for you, could you please rephrase your last question or describe your
>> use case? I didn't get it.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/event_time.html
>>
>> On Tue, Mar 10, 2020 at 5:01 AM kant kodali  wrote:
>>
>>> Hi All,
>>>
>>> Do I need to set assignTimestampsAndWatermarks if I set my time
>>> characteristic to IngestionTime?
>>>
>>> say I set my time characteristic of stream execution environment to
>>> Ingestion time as follows
>>>
>>>
>>> streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>>>
>>> do I need to call
>>> datastream.assignTimestampsAndWatermarks(AscendingTimestampExtractor) ?
>>>
>>> I thought datastream.assignTimestampsAndWatermarks is mandatory only if
>>> time characteristic is event time. No? Did this behavior change in Flink
>>> 1.10? because I see libraries not setting
>>> datastream.assignTimestampsAndWatermarks when time characteristic is
>>> Ingestion time but they do for event time. If not, I am wondering how can I
>>> set AscendingTimestampExtractor in a distributed environment? is there
>>> anyway to add monotonically increasing long(AscendingTimestampExtractor)
>>> without any distributed locks?
>>>
>>> Thanks!
>>>
>>


Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

2020-03-09 Thread kant kodali
Hi All,

Do I need to set assignTimestampsAndWatermarks if I set my time
characteristic to IngestionTime?

say I set my time characteristic of stream execution environment to
Ingestion time as follows

streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

do I need to call
datastream.assignTimestampsAndWatermarks(AscendingTimestampExtractor) ?

I thought datastream.assignTimestampsAndWatermarks is mandatory only if
time characteristic is event time. No? Did this behavior change in Flink
1.10? because I see libraries not setting
datastream.assignTimestampsAndWatermarks when time characteristic is
Ingestion time but they do for event time. If not, I am wondering how can I
set AscendingTimestampExtractor in a distributed environment? is there
anyway to add monotonically increasing long(AscendingTimestampExtractor)
without any distributed locks?

Thanks!


How does custom object/data structures get mapped into rocksdb underneath?

2020-03-09 Thread kant kodali
Hi All,

I want to do stateful streaming and I was wondering how Custom objects get
mapped into rocksdb?

say I have the following class that represents my state

public class MyState {
private HashMap map1 ; // T can be any type
private HashMap map2; // S can be any type
}

I wonder how these two maps gets mapped into rocksdb? and how does Flink
know that map1 and map2 together are part of my state but not
individual ones in isolation?

Thanks!


Re: How to print the aggregated state everytime it is updated?

2020-03-06 Thread kant kodali
Hi,

Thanks for this. so how can I emulate an infinite window while outputting
every second? simply put, I want to store the state forever (say years) and
since rocksdb is my state backend I am assuming I can state the state until
I run out of disk. However I want to see all the updates to the states
every second. sounds to me I need to have a window of one second, compute
for that window and pass it on to next window or is there some other way?

Thanks

On Fri, Mar 6, 2020 at 1:33 AM Congxian Qiu  wrote:

> Hi
>
> From the description, you use window operator, and set to event time. then
> you should call `DataStream.assignTimestampsAndWatermarks` to set the
> timestamp and watermark.
> Window is triggered when the watermark exceed the window end time
>
> Best,
> Congxian
>
>
> kant kodali  于2020年3月4日周三 上午5:11写道:
>
>> Hi All,
>>
>> I have a custom aggregated state that is represent by Set and I
>> have a stream of values coming in from Kafka where I inspect, compute the
>> custom aggregation and store it in Set. Now, I am trying to figureout
>> how do I print the updated value everytime this state is updated?
>>
>> Imagine I have a Datastream>
>>
>> I tried few things already but keep running into the following exception.
>> Not sure why? Do I need to call assignTimestampsAndWatermark? I thought
>> watermarks are not mandatory in Flink especially when I want to keep this
>> aggregated state forever. any simple code sample on how to print the
>> streaming aggregated state represented by Datastream> will be
>> great! You can imagine my Set has a toString() method that takes
>> cares of printing..and I just want to see those values in stdout.
>>
>> Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE
>> timestamp (= no timestamp marker). Is the time characteristic set to
>> 'ProcessingTime', or did you forget to call
>> 'DataStream.assignTimestampsAndWatermarks(...)'?
>>
>


How to print the aggregated state everytime it is updated?

2020-03-03 Thread kant kodali
Hi All,

I have a custom aggregated state that is represent by Set and I have
a stream of values coming in from Kafka where I inspect, compute the custom
aggregation and store it in Set. Now, I am trying to figureout how do
I print the updated value everytime this state is updated?

Imagine I have a Datastream>

I tried few things already but keep running into the following exception.
Not sure why? Do I need to call assignTimestampsAndWatermark? I thought
watermarks are not mandatory in Flink especially when I want to keep this
aggregated state forever. any simple code sample on how to print the
streaming aggregated state represented by Datastream> will be
great! You can imagine my Set has a toString() method that takes
cares of printing..and I just want to see those values in stdout.

Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp
(= no timestamp marker). Is the time characteristic set to
'ProcessingTime', or did you forget to call
'DataStream.assignTimestampsAndWatermarks(...)'?


Re: java.util.concurrent.ExecutionException

2020-03-03 Thread kant kodali
Hi Gary,

This has to do with my Kafka. After restarting Kafka it seems to work fine!

Thanks!

On Tue, Mar 3, 2020 at 8:18 AM kant kodali  wrote:

> The program finished with the following exception:
>
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: f57b682f5867a8bf6ff6e1ddce93a1ab)
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>
> at
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: f57b682f5867a8bf6ff6e1ddce93a1ab)
>
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>
> at
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
>
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>
> at Test.main(Test.java:71)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>
> ... 8 more
>
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
> failed (JobID: f57b682f5867a8bf6ff6e1ddce93a1ab)
>
> at
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
>
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>
> at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)
>
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)
>
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>
> at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
>
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
>
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
>
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>
> at
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
>

Re: java.util.concurrent.ExecutionException

2020-03-03 Thread kant kodali
:484)

at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)

at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)

at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)

at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at akka.actor.Actor$class.aroundReceive(Actor.scala:517)

at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)

at akka.actor.ActorCell.invoke(ActorCell.scala:561)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)

at akka.dispatch.Mailbox.run(Mailbox.scala:225)

at akka.dispatch.Mailbox.exec(Mailbox.scala:235)

at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by:
org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException:
Timeout of 6ms expired before the position for partition edges-0 could
be determined

On Tue, Mar 3, 2020 at 8:03 AM Gary Yao  wrote:

> Hi,
>
> Can you post the complete stacktrace?
>
> Best,
> Gary
>
> On Tue, Mar 3, 2020 at 1:08 PM kant kodali  wrote:
>
>> Hi All,
>>
>> I am just trying to read edges which has the following format in Kafka
>>
>> 1,2
>> 1,3
>> 1,5
>>
>> using the Table API and then converting to DataStream of Edge Objects and
>> printing them. However I am getting
>> java.util.concurrent.ExecutionException but not sure why?
>>
>> Here is the sample code
>>
>> import org.apache.flink.api.common.typeinfo.TypeHint;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
>> import org.apache.flink.graph.Edge;
>> import org.apache.flink.runtime.state.StateBackend;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import org.apache.flink.table.api.*;
>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>> import org.apache.flink.table.descriptors.Csv;
>> import org.apache.flink.table.descriptors.Kafka;
>> import org.apache.flink.table.descriptors.Schema;
>> import org.apache.flink.types.NullValue;
>> import org.apache.flink.types.Row;
>>
>> import java.util.UUID;
>>
>> public class Test {
>>
>> public static void main(String... args) throws Exception {
>>
>> EnvironmentSettings bsSettings = 
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setStateBackend((StateBackend) new 
>> RocksDBStateBackend("file:///tmp/rocksdb"));
>>
>> StreamTableEnvironment bsTableEnv = 
>> StreamTableEnvironment.create(env, bsSettings);
>>
>> bsTableEnv.connect(
>> new Kafka()
>> .property("bootstrap.servers", "localhost:9092")
>> .property("zookeeper.connect", "localhost:2181")
>> .property("group.id", UUID.randomUUID().toString())
>> .startFromEarliest()
>> .version("universal")
>> .topic("edges")
>> )
>> .withFormat(new Csv().fieldDelimiter(','))
>> .withSchema(
>> new Schema()
>> .field("

zookeeper.connect is not needed but Flink requires it

2020-03-03 Thread kant kodali
Hi All,

The zookeeper.connect is not needed for KafkaConsumer or KafkaAdminClient
however Flink requires it. You can also see in the Flink TaskManager logs
the KafkaConsumer is not recognizing this property anyways.

bsTableEnv.connect(
new Kafka()
.property("bootstrap.servers", "localhost:9092")
.property("zookeeper.connect", "localhost:2181")


2020-03-03 03:48:54,644 WARN
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.
ConsumerConfig - The configuration 'zookeeper.connect' was supplied but isn't
a known config.


How is state stored in rocksdb?

2020-03-02 Thread kant kodali
Hi All,

I am wondering how Flink serializes and deserializes state from rockdb?
What is the format used?

For example, say I am doing some stateful streaming and say an object for
my class below represents a state. how does Flink serializes and
deserializes the object of MyClass below? is it just Java or Kryo
serialization? if so, are they queryable?

public class MyClass {
private Map map1 = new HashMap<>();
private Map map2 = new HashMap<>();
}

Thanks!


Re: Do I need to use Table API or DataStream API to construct sources?

2020-03-02 Thread kant kodali
Hi Arvid,

Yes I got it..and it works as said in my previous email.

Thanks!


On Mon, Mar 2, 2020 at 12:10 AM Arvid Heise  wrote:

> Hi Kant,
>
> I think Dawid meant to not add the Kafka version number like this:
>
> flinkShadowJar 
> "org.apache.flink:flink-sql-connector-kafka_2.11:${flinkVersion}"
>
>
> On Sun, Mar 1, 2020 at 7:31 PM kant kodali  wrote:
>
>> * What went wrong:
>> Could not determine the dependencies of task ':shadowJar'.
>> > Could not resolve all dependencies for configuration ':flinkShadowJar'.
>>> Could not find
>> org.apache.flink:flink-sql-connector-kafka_2.11:universal.
>>  Searched in the following locations:
>>-
>> https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.pom
>>-
>> https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.jar
>>-
>> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.pom
>>-
>> https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.jar
>>  Required by:
>>  project :
>>
>>
>>
>> On Sun, Mar 1, 2020 at 6:43 AM Dawid Wysakowicz 
>> wrote:
>>
>>> Hi Kant,
>>>
>>> If you want to use the *universal *kafka connector you use "universal"
>>> for the version. The community decided to no longer distinguish different
>>> kafka connector versions, but to use the newest kafka client version for
>>> all versions of kafka 1.0+. So if you want to use the connector from
>>> flink-sql-connector-kafka_2.11 use "universal" for the version.
>>>
>>> As for the collect/print sink. We do realize importance of the sink and
>>> there were a few approaches to implement one. Including the TableUtils
>>> mentioned by godfrey. It does not have strong consistency guarantees and is
>>> recommended rather only for experiments/testing. There is also an ongoing
>>> discussion how to implement such a sink for *both *batch and streaming
>>> here:
>>> https://issues.apache.org/jira/browse/FLINK-14807?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel=17046455#comment-17046455
>>>
>>> Best,
>>>
>>> Dawid
>>> On 01/03/2020 12:00, kant kodali wrote:
>>>
>>> Hi Benchao,
>>>
>>> That worked! Pasting the build.gradle file here. However this only works
>>> for 0.11 and it needs zookeeper.connect() which shouldn't be required. not
>>> sure why it is required in Flink Kafka connector?  If I change the version
>>> to 2.2 in the code and specify this jar
>>>
>>> flinkShadowJar 
>>> "org.apache.flink:flink-sql-connector-kafka_2.11:${flinkVersion}"
>>>
>>> or
>>>
>>> flinkShadowJar 
>>> "org.apache.flink:flink-sql-connector-kafka-0.11_2.11:${flinkVersion}" 
>>> //Not sure if I should use this one for Kafka >= 0.11
>>>
>>> It doesn't work either.
>>>
>>>
>>> buildscript {repositories {jcenter() // this applies only to 
>>> the Gradle 'Shadow' plugin}dependencies {classpath 
>>> 'com.github.jengelman.gradle.plugins:shadow:2.0.4'}}plugins {id 
>>> 'java'id 'application'}mainClassName = 'Test'apply plugin: 
>>> 'com.github.johnrengelman.shadow'ext {javaVersion = '1.8'
>>> flinkVersion = '1.10.0'scalaBinaryVersion = '2.11'slf4jVersion = 
>>> '1.7.7'log4jVersion = '1.2.17'}sourceCompatibility = 
>>> javaVersiontargetCompatibility = javaVersiontasks.withType(JavaCompile) {   
>>>  options.encoding = 'UTF-8'}applicationDefaultJvmArgs = 
>>> ["-Dlog4j.configuration=log4j.properties"]
>>> // declare where to find the dependencies of your projectrepositories {
>>> mavenCentral()
>>> maven { url 
>>> "https://repository.apache.org/content/repositories/snapshots/; }}// NOTE: 
>>> We cannot use "compileOnly" or "shadow" configurations since then we could 
>>> not run code// in the IDE or with "gradle run". We also cannot exclude 
>>> transitive dependencies from the// shadowJar yet (see 
>>> https://github.com/johnrengelman/shad

Re: Do I need to use Table API or DataStream API to construct sources?

2020-03-01 Thread kant kodali
* What went wrong:
Could not determine the dependencies of task ':shadowJar'.
> Could not resolve all dependencies for configuration ':flinkShadowJar'.
   > Could not find
org.apache.flink:flink-sql-connector-kafka_2.11:universal.
 Searched in the following locations:
   -
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.pom
   -
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.jar
   -
https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.pom
   -
https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.jar
 Required by:
 project :



On Sun, Mar 1, 2020 at 6:43 AM Dawid Wysakowicz 
wrote:

> Hi Kant,
>
> If you want to use the *universal *kafka connector you use "universal"
> for the version. The community decided to no longer distinguish different
> kafka connector versions, but to use the newest kafka client version for
> all versions of kafka 1.0+. So if you want to use the connector from
> flink-sql-connector-kafka_2.11 use "universal" for the version.
>
> As for the collect/print sink. We do realize importance of the sink and
> there were a few approaches to implement one. Including the TableUtils
> mentioned by godfrey. It does not have strong consistency guarantees and is
> recommended rather only for experiments/testing. There is also an ongoing
> discussion how to implement such a sink for *both *batch and streaming
> here:
> https://issues.apache.org/jira/browse/FLINK-14807?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel=17046455#comment-17046455
>
> Best,
>
> Dawid
> On 01/03/2020 12:00, kant kodali wrote:
>
> Hi Benchao,
>
> That worked! Pasting the build.gradle file here. However this only works
> for 0.11 and it needs zookeeper.connect() which shouldn't be required. not
> sure why it is required in Flink Kafka connector?  If I change the version
> to 2.2 in the code and specify this jar
>
> flinkShadowJar 
> "org.apache.flink:flink-sql-connector-kafka_2.11:${flinkVersion}"
>
> or
>
> flinkShadowJar 
> "org.apache.flink:flink-sql-connector-kafka-0.11_2.11:${flinkVersion}" //Not 
> sure if I should use this one for Kafka >= 0.11
>
> It doesn't work either.
>
>
> buildscript {repositories {jcenter() // this applies only to the 
> Gradle 'Shadow' plugin}dependencies {classpath 
> 'com.github.jengelman.gradle.plugins:shadow:2.0.4'}}plugins {id 
> 'java'id 'application'}mainClassName = 'Test'apply plugin: 
> 'com.github.johnrengelman.shadow'ext {javaVersion = '1.8'flinkVersion 
> = '1.10.0'scalaBinaryVersion = '2.11'slf4jVersion = '1.7.7'
> log4jVersion = '1.2.17'}sourceCompatibility = javaVersiontargetCompatibility 
> = javaVersiontasks.withType(JavaCompile) {options.encoding = 
> 'UTF-8'}applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"]
> // declare where to find the dependencies of your projectrepositories {
> mavenCentral()
> maven { url 
> "https://repository.apache.org/content/repositories/snapshots/; }}// NOTE: We 
> cannot use "compileOnly" or "shadow" configurations since then we could not 
> run code// in the IDE or with "gradle run". We also cannot exclude transitive 
> dependencies from the// shadowJar yet (see 
> https://github.com/johnrengelman/shadow/issues/159).// -> Explicitly define 
> the // libraries we want to be included in the "flinkShadowJar" 
> configuration!configurations {flinkShadowJar // dependencies which go 
> into the shadowJar// always exclude these (also from transitive 
> dependencies) since they are provided by FlinkflinkShadowJar.exclude 
> group: 'org.apache.flink', module: 'force-shading'flinkShadowJar.exclude 
> group: 'com.google.code.findbugs', module: 'jsr305'flinkShadowJar.exclude 
> group: 'org.slf4j'flinkShadowJar.exclude group: 'log4j'}// declare the 
> dependencies for your production and test codedependencies {// 
> --// 
> Compile-time dependencies that should NOT be part of the// shadow jar and 
> are provided in the lib folder of Flink// 
> --compile 
> "org.apache.flink:flink-

Re: Do I need to use Table API or DataStream API to construct sources?

2020-03-01 Thread kant kodali
manifest {
attributes 'Built-By': System.getProperty('user.name'),
'Build-Jdk': System.getProperty('java.version')
}
}

shadowJar {
configurations = [project.configurations.flinkShadowJar]
mergeServiceFiles()
manifest {
attributes 'Main-Class': mainClassName
}
}






On Sun, Mar 1, 2020 at 1:38 AM Benchao Li  wrote:

> I don't know how gradle works, but in Maven, packaging dependencies into
> one fat jar needs to specify how SPI property files should be dealt with,
> like
>
> 
>  implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
> 
>
> Could you check that your final jar contains correct resource file?
>
> godfrey he  于2020年3月1日周日 下午5:25写道:
>
>> I think you should use `flink-sql-connector-kafka*-0.11*_2.11` instead
>> of `flink-connector-kafka_2.11`.
>>
>> Bests,
>> Godfrey
>>
>> kant kodali  于2020年3月1日周日 下午5:15写道:
>>
>>> The dependency was already there. Below is my build.gradle. Also I
>>> checked the kafka version and looks like the jar
>>>
>>> flinkShadowJar "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"
>>>
>>> downloads kafka-clients version 2.2.0. So I changed my code to version
>>> 2.2.0 and same problem persists.
>>>
>>> buildscript {
>>> repositories {
>>> jcenter() // this applies only to the Gradle 'Shadow' plugin
>>> }
>>> dependencies {
>>> classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
>>> }
>>> }
>>>
>>> plugins {
>>> id 'java'
>>> id 'application'
>>> }
>>>
>>> mainClassName = 'Test'
>>> apply plugin: 'com.github.johnrengelman.shadow'
>>>
>>> ext {
>>> javaVersion = '1.8'
>>> flinkVersion = '1.10.0'
>>> scalaBinaryVersion = '2.11'
>>> slf4jVersion = '1.7.7'
>>> log4jVersion = '1.2.17'
>>> }
>>>
>>>
>>> sourceCompatibility = javaVersion
>>> targetCompatibility = javaVersion
>>> tasks.withType(JavaCompile) {
>>> options.encoding = 'UTF-8'
>>> }
>>>
>>> applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"]
>>>
>>> // declare where to find the dependencies of your project
>>> repositories {
>>> mavenCentral()
>>> maven { url 
>>> "https://repository.apache.org/content/repositories/snapshots/; }
>>> }
>>>
>>> // NOTE: We cannot use "compileOnly" or "shadow" configurations since then 
>>> we could not run code
>>> // in the IDE or with "gradle run". We also cannot exclude transitive 
>>> dependencies from the
>>> // shadowJar yet (see https://github.com/johnrengelman/shadow/issues/159).
>>> // -> Explicitly define the // libraries we want to be included in the 
>>> "flinkShadowJar" configuration!
>>> configurations {
>>> flinkShadowJar // dependencies which go into the shadowJar
>>>
>>> // always exclude these (also from transitive dependencies) since they 
>>> are provided by Flink
>>> flinkShadowJar.exclude group: 'org.apache.flink', module: 
>>> 'force-shading'
>>> flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 
>>> 'jsr305'
>>> flinkShadowJar.exclude group: 'org.slf4j'
>>> flinkShadowJar.exclude group: 'log4j'
>>> }
>>>
>>> // declare the dependencies for your production and test code
>>> dependencies {
>>> // --
>>> // Compile-time dependencies that should NOT be part of the
>>> // shadow jar and are provided in the lib folder of Flink
>>> // --
>>> compile "org.apache.flink:flink-java:${flinkVersion}"
>>> compile 
>>> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
>>>
>>> // --
>>> // Dependencies that should be part of the shadow jar, e.g.
>>> // connectors. These must be in the flinkShadowJar configuration!
>>> // --
>>>
>>> compile "org.apache.flink:flink-java:${flinkVersion}&quo

Re: Do I need to use Table API or DataStream API to construct sources?

2020-03-01 Thread kant kodali
onstration or
> testing and is only applicable for *small batch jobs* and small finite *append
> only stream jobs*.  code like:
> Table table = tEnv.sqlQuery("select ...");
> List result = TableUtils.collectToList(table);
> result.
>
> currently, we are planner to implement Table#collect[1], after
> that Table#head and Table#print may be also introduced soon.
>
> >  The program finished with the following exception:
> please make sure that the kafka version in Test class and the kafka
> version in pom dependency are same. I tested your code successfully.
>
> Bests,
> Godfrey
>
> [1] https://issues.apache.org/jira/browse/FLINK-14807
>
>
> Benchao Li  于2020年3月1日周日 下午4:44写道:
>
>> Hi kant,
>>
>> CSV format is an independent module, you need to add it as your
>> dependency.
>>
>> 
>>org.apache.flink
>>flink-csv
>>${flink.version}
>> 
>>
>>
>> kant kodali  于2020年3月1日周日 下午3:43写道:
>>
>>> 
>>>  The program finished with the following exception:
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error: findAndCreateTableSource failed.
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>> at
>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>>> at
>>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>>> Caused by: org.apache.flink.table.api.TableException:
>>> findAndCreateTableSource failed.
>>> at
>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
>>> at
>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92)
>>> at
>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156)
>>> at
>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65)
>>> at
>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65)
>>> at
>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>>> at
>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>>> at
>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
>>> at
>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(Parser

Re: Do I need to use Table API or DataStream API to construct sources?

2020-02-29 Thread kant kodali
 at 11:35 PM Piotr Nowojski  wrote:

> Hi,
>
> > Thanks for the pointer. Looks like the documentation says to use
> tableEnv.registerTableSink however in my IDE it shows the method is
> deprecated in Flink 1.10.
>
> It looks like not all of the documentation was updated after methods were
> deprecated. However if you look at the java docs of the `registerTableSink`
> method, you can find an answer [1].
>
> >  It sill doesnt work because it says for CSV the connector.type should
> be filesystem not Kafka.
>
> Can you post the full stack trace? As I’m not familiar with the Table API,
> maybe you Timo or Dawid know what’s going on here?
>
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/api/TableEnvironment.html#registerTableSink-java.lang.String-org.apache.flink.table.sinks.TableSink-
>
> On 1 Mar 2020, at 07:50, kant kodali  wrote:
>
> Here is my updated code after digging through the source code (not sure if
> it is correct ). It sill doesnt work because it says for CSV the
> connector.type should be filesystem not Kafka but documentation says it is
> supported.
>
>
> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
> import org.apache.flink.runtime.state.StateBackend;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.DataTypes;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.descriptors.Csv;
> import org.apache.flink.table.descriptors.Kafka;
> import org.apache.flink.table.descriptors.Schema;
>
> public class Test {
>
> public static void main(String... args) throws Exception {
> EnvironmentSettings settings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
>
> StreamExecutionEnvironment streamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> streamExecutionEnvironment.setStateBackend((StateBackend) new 
> RocksDBStateBackend("file:///tmp/rocksdb"));
> StreamTableEnvironment tableEnvironment = 
> StreamTableEnvironment.create(streamExecutionEnvironment, settings);
>
> tableEnvironment
> .connect(
> new Kafka()
> .version("0.11")
> .topic("test-topic1")
> )
> .withFormat(new Csv())
> .withSchema(new Schema().field("f0", DataTypes.STRING()))
> .inAppendMode()
> .createTemporaryTable("kafka_source");
>
> Table resultTable = tableEnvironment.sqlQuery("select * from 
> kafka_source");
>
> tableEnvironment
> .connect(
> new Kafka()
> .version("0.11")
> .topic("test-topic2")
> )
> .withFormat(new Csv())
> .withSchema(new Schema().field("f0", DataTypes.STRING()))
> .inAppendMode()
> .createTemporaryTable("kafka_target");
>
> tableEnvironment.insertInto("kafka_target", resultTable);
>
> tableEnvironment.execute("Sample Job");
> }
> }
>
>
> On Sat, Feb 29, 2020 at 7:48 PM kant kodali  wrote:
>
>> Hi Benchao,
>>
>> Agreed a ConsoleSink is very useful but that is not the only problem
>> here. Documentation says use  tableEnv.registerTableSink all over the place
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csvtablesink
>> however that function is deprecated. so how do I add any other Sink?
>>
>> Thanks!
>>
>>
>>
>>
>>
>> On Sat, Feb 29, 2020 at 6:05 PM Benchao Li  wrote:
>>
>>> Hi kant,
>>>
>>> AFAIK, there is no "print to stdout" sink for Table API now, you can
>>> implement one custom sink following this doc[1].
>>>
>>> IMO, an out-of-box print table sink is very useful, and I've created an
>>> issue[2] to track this.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink
>>> [2] https://issues.apache.org/jira/browse/FLINK-16354
>>>
>>> kant kodali  于2020年3月1日周日 上午2:30写道:
>>>
>>>> Hi,
>>>>
>>>> Thanks for the pointer. Looks like the 

Re: Do I need to use Table API or DataStream API to construct sources?

2020-02-29 Thread kant kodali
Here is my updated code after digging through the source code (not sure if
it is correct ). It sill doesnt work because it says for CSV the
connector.type should be filesystem not Kafka but documentation says it is
supported.


import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;

public class Test {

public static void main(String... args) throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();

StreamExecutionEnvironment streamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment();
streamExecutionEnvironment.setStateBackend((StateBackend) new
RocksDBStateBackend("file:///tmp/rocksdb"));
StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(streamExecutionEnvironment, settings);

tableEnvironment
.connect(
new Kafka()
.version("0.11")
.topic("test-topic1")
)
.withFormat(new Csv())
.withSchema(new Schema().field("f0", DataTypes.STRING()))
.inAppendMode()
.createTemporaryTable("kafka_source");

Table resultTable = tableEnvironment.sqlQuery("select * from
kafka_source");

tableEnvironment
.connect(
new Kafka()
.version("0.11")
.topic("test-topic2")
)
.withFormat(new Csv())
.withSchema(new Schema().field("f0", DataTypes.STRING()))
.inAppendMode()
.createTemporaryTable("kafka_target");

tableEnvironment.insertInto("kafka_target", resultTable);

tableEnvironment.execute("Sample Job");
}
}


On Sat, Feb 29, 2020 at 7:48 PM kant kodali  wrote:

> Hi Benchao,
>
> Agreed a ConsoleSink is very useful but that is not the only problem here.
> Documentation says use  tableEnv.registerTableSink all over the place
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csvtablesink
> however that function is deprecated. so how do I add any other Sink?
>
> Thanks!
>
>
>
>
>
> On Sat, Feb 29, 2020 at 6:05 PM Benchao Li  wrote:
>
>> Hi kant,
>>
>> AFAIK, there is no "print to stdout" sink for Table API now, you can
>> implement one custom sink following this doc[1].
>>
>> IMO, an out-of-box print table sink is very useful, and I've created an
>> issue[2] to track this.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink
>> [2] https://issues.apache.org/jira/browse/FLINK-16354
>>
>> kant kodali  于2020年3月1日周日 上午2:30写道:
>>
>>> Hi,
>>>
>>> Thanks for the pointer. Looks like the documentation says to use
>>> tableEnv.registerTableSink however in my IDE it shows the method is
>>> deprecated in Flink 1.10. so I am still not seeing a way to add a sink that
>>> can print to stdout? what sink should I use to print to stdout and how do I
>>> add it without converting into DataStream?
>>>
>>> Thanks!
>>>
>>> On Sat, Feb 29, 2020 at 7:26 AM Piotr Nowojski 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> You shouldn’t be using `KafkaTableSource` as it’s marked @Internal.
>>>> It’s not part of any public API.
>>>>
>>>> You don’t have to convert DataStream into Table to read from Kafka in
>>>> Table API. I guess you could, if you had used DataStream API’s
>>>> FlinkKafkaConsumer as it’s documented here [1].
>>>>
>>>> But you should be able to use Kafka Table connector directly, as it is
>>>> described in the docs [2][3].
>>>>
>>>> Piotrek
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview
>>>> [3]
>>>> https://ci.apache.org/pr

Re: Do I need to use Table API or DataStream API to construct sources?

2020-02-29 Thread kant kodali
Hi Benchao,

Agreed a ConsoleSink is very useful but that is not the only problem here.
Documentation says use  tableEnv.registerTableSink all over the place
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csvtablesink
however that function is deprecated. so how do I add any other Sink?

Thanks!





On Sat, Feb 29, 2020 at 6:05 PM Benchao Li  wrote:

> Hi kant,
>
> AFAIK, there is no "print to stdout" sink for Table API now, you can
> implement one custom sink following this doc[1].
>
> IMO, an out-of-box print table sink is very useful, and I've created an
> issue[2] to track this.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink
> [2] https://issues.apache.org/jira/browse/FLINK-16354
>
> kant kodali  于2020年3月1日周日 上午2:30写道:
>
>> Hi,
>>
>> Thanks for the pointer. Looks like the documentation says to use
>> tableEnv.registerTableSink however in my IDE it shows the method is
>> deprecated in Flink 1.10. so I am still not seeing a way to add a sink that
>> can print to stdout? what sink should I use to print to stdout and how do I
>> add it without converting into DataStream?
>>
>> Thanks!
>>
>> On Sat, Feb 29, 2020 at 7:26 AM Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> You shouldn’t be using `KafkaTableSource` as it’s marked @Internal. It’s
>>> not part of any public API.
>>>
>>> You don’t have to convert DataStream into Table to read from Kafka in
>>> Table API. I guess you could, if you had used DataStream API’s
>>> FlinkKafkaConsumer as it’s documented here [1].
>>>
>>> But you should be able to use Kafka Table connector directly, as it is
>>> described in the docs [2][3].
>>>
>>> Piotrek
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>>>
>>> On 29 Feb 2020, at 12:54, kant kodali  wrote:
>>>
>>> Also why do I need to convert to DataStream to print the rows of a
>>> table? Why not have a print method in the Table itself?
>>>
>>> On Sat, Feb 29, 2020 at 3:40 AM kant kodali  wrote:
>>>
>>>> Hi All,
>>>>
>>>> Do I need to use DataStream API or Table API to construct sources? I am
>>>> just trying to read from Kafka and print it to console. And yes I tried it
>>>> with datastreams and it works fine but I want to do it using Table related
>>>> APIs. I don't see any documentation or a sample on how to create Kafka
>>>> table source or any other source using Table Source API's so after some
>>>> digging I wrote the following code. My ultimate goal is to avoid Datastream
>>>> API as much as possible and just use Table API & SQL but somehow I feel the
>>>> Flink framework focuses on DataStream than the SQL interface. am I wrong?
>>>> From the user perspective wouldn't it make more sense to focus on SQL
>>>> interfaces for both streaming and batch?
>>>>
>>>>
>>>> import 
>>>> org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
>>>> import 
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>> import org.apache.flink.streaming.connectors.kafka.KafkaTableSource;
>>>> import org.apache.flink.table.api.DataTypes;
>>>> import org.apache.flink.table.api.EnvironmentSettings;
>>>> import org.apache.flink.table.api.Table;
>>>> import org.apache.flink.table.api.TableSchema;
>>>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>>>> import org.apache.flink.table.sources.TableSource;
>>>> import org.apache.flink.types.Row;
>>>>
>>>> import java.io.IOException;
>>>> import java.util.Properties;
>>>>
>>>> public class Test {
>>>>
>>>> public class MyDeserializationSchema extends 
>>>> AbstractDeserializationSchema {
>>>> @Override
>>>> public Row deserialize(byte[] message) throws IOException {
>>>> return Row.of(new String(message));
>>>> }
>>>> }
>>>>
>>>>   

Is CSV format supported for Kafka in Flink 1.10?

2020-02-29 Thread kant kodali
Hi,

Is CSV format supported for Kafka in Flink 1.10? It says I need to specify
connector.type as Filesystem but documentation says it is supported for
Kafka?

import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

public class Test {

public static void main(String... args) throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();

StreamExecutionEnvironment streamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment();
streamExecutionEnvironment.setStateBackend((StateBackend) new
RocksDBStateBackend("file:///tmp/rocksdb"));
StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(streamExecutionEnvironment, settings);

tableEnvironment
.connect(
new Kafka()
.version("0.11")
.topic("test-topic1")
)
.withFormat(new Csv())
.withSchema(new Schema().field("f0", DataTypes.STRING()))
.inAppendMode()
.createTemporaryTable("kafka_source");

Table resultTable = tableEnvironment.sqlQuery("select * from
kafka_source");
tableEnvironment.toAppendStream(resultTable, Row.class).print();

tableEnvironment.execute("Sample Job");
}
}


This code generates the following error

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvAppendTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'

The following properties are requested:
connector.property-version=1
connector.topic=test-topic1
connector.type=kafka
connector.version=0.11
format.property-version=1
format.type=csv
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=f0
update-mode=append

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
at
org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
at
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
at
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
at
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
at
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52)
... 34 more


Re: Do I need to use Table API or DataStream API to construct sources?

2020-02-29 Thread kant kodali
Hi,

Thanks for the pointer. Looks like the documentation says to use
tableEnv.registerTableSink however in my IDE it shows the method is
deprecated in Flink 1.10. so I am still not seeing a way to add a sink that
can print to stdout? what sink should I use to print to stdout and how do I
add it without converting into DataStream?

Thanks!

On Sat, Feb 29, 2020 at 7:26 AM Piotr Nowojski  wrote:

> Hi,
>
> You shouldn’t be using `KafkaTableSource` as it’s marked @Internal. It’s
> not part of any public API.
>
> You don’t have to convert DataStream into Table to read from Kafka in
> Table API. I guess you could, if you had used DataStream API’s
> FlinkKafkaConsumer as it’s documented here [1].
>
> But you should be able to use Kafka Table connector directly, as it is
> described in the docs [2][3].
>
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>
> On 29 Feb 2020, at 12:54, kant kodali  wrote:
>
> Also why do I need to convert to DataStream to print the rows of a table?
> Why not have a print method in the Table itself?
>
> On Sat, Feb 29, 2020 at 3:40 AM kant kodali  wrote:
>
>> Hi All,
>>
>> Do I need to use DataStream API or Table API to construct sources? I am
>> just trying to read from Kafka and print it to console. And yes I tried it
>> with datastreams and it works fine but I want to do it using Table related
>> APIs. I don't see any documentation or a sample on how to create Kafka
>> table source or any other source using Table Source API's so after some
>> digging I wrote the following code. My ultimate goal is to avoid Datastream
>> API as much as possible and just use Table API & SQL but somehow I feel the
>> Flink framework focuses on DataStream than the SQL interface. am I wrong?
>> From the user perspective wouldn't it make more sense to focus on SQL
>> interfaces for both streaming and batch?
>>
>>
>> import 
>> org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import org.apache.flink.streaming.connectors.kafka.KafkaTableSource;
>> import org.apache.flink.table.api.DataTypes;
>> import org.apache.flink.table.api.EnvironmentSettings;
>> import org.apache.flink.table.api.Table;
>> import org.apache.flink.table.api.TableSchema;
>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>> import org.apache.flink.table.sources.TableSource;
>> import org.apache.flink.types.Row;
>>
>> import java.io.IOException;
>> import java.util.Properties;
>>
>> public class Test {
>>
>> public class MyDeserializationSchema extends 
>> AbstractDeserializationSchema {
>> @Override
>> public Row deserialize(byte[] message) throws IOException {
>> return Row.of(new String(message));
>> }
>> }
>>
>> public static void main(String... args) throws Exception {
>> Test test = new Test();
>> EnvironmentSettings settings = EnvironmentSettings.newInstance()
>> .useBlinkPlanner()
>> .inStreamingMode()
>> .build();
>>
>> StreamExecutionEnvironment streamExecutionEnvironment = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> StreamTableEnvironment tableEnvironment = 
>> StreamTableEnvironment.create(streamExecutionEnvironment, settings);
>>
>> TableSource tableSource = test.getKafkaTableSource();
>> Table kafkaTable = tableEnvironment.fromTableSource(tableSource);
>> tableEnvironment.createTemporaryView("kafka_source", kafkaTable);
>>
>> Table resultTable = tableEnvironment.sqlQuery("select * from 
>> kafka_source");
>> tableEnvironment.toAppendStream(resultTable, Row.class).print();
>>
>> streamExecutionEnvironment.execute("Sample Job");
>> }
>>
>> public KafkaTableSource getKafkaTableSource() {
>> TableSchema tableSchema = TableSchema.builder().field("f0", 
>> DataTypes.STRING()).build();
>> Properties properties = new Properties();
>> properties.setProperty("bootstrap.servers", "localhost:9092");
>> pro

Re: Do I need to use Table API or DataStream API to construct sources?

2020-02-29 Thread kant kodali
Also why do I need to convert to DataStream to print the rows of a table?
Why not have a print method in the Table itself?

On Sat, Feb 29, 2020 at 3:40 AM kant kodali  wrote:

> Hi All,
>
> Do I need to use DataStream API or Table API to construct sources? I am
> just trying to read from Kafka and print it to console. And yes I tried it
> with datastreams and it works fine but I want to do it using Table related
> APIs. I don't see any documentation or a sample on how to create Kafka
> table source or any other source using Table Source API's so after some
> digging I wrote the following code. My ultimate goal is to avoid Datastream
> API as much as possible and just use Table API & SQL but somehow I feel the
> Flink framework focuses on DataStream than the SQL interface. am I wrong?
> From the user perspective wouldn't it make more sense to focus on SQL
> interfaces for both streaming and batch?
>
>
> import 
> org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.connectors.kafka.KafkaTableSource;
> import org.apache.flink.table.api.DataTypes;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.TableSchema;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.sources.TableSource;
> import org.apache.flink.types.Row;
>
> import java.io.IOException;
> import java.util.Properties;
>
> public class Test {
>
> public class MyDeserializationSchema extends 
> AbstractDeserializationSchema {
> @Override
> public Row deserialize(byte[] message) throws IOException {
> return Row.of(new String(message));
> }
> }
>
> public static void main(String... args) throws Exception {
> Test test = new Test();
> EnvironmentSettings settings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
>
> StreamExecutionEnvironment streamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnvironment = 
> StreamTableEnvironment.create(streamExecutionEnvironment, settings);
>
> TableSource tableSource = test.getKafkaTableSource();
> Table kafkaTable = tableEnvironment.fromTableSource(tableSource);
> tableEnvironment.createTemporaryView("kafka_source", kafkaTable);
>
> Table resultTable = tableEnvironment.sqlQuery("select * from 
> kafka_source");
> tableEnvironment.toAppendStream(resultTable, Row.class).print();
>
> streamExecutionEnvironment.execute("Sample Job");
> }
>
> public KafkaTableSource getKafkaTableSource() {
> TableSchema tableSchema = TableSchema.builder().field("f0", 
> DataTypes.STRING()).build();
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers", "localhost:9092");
> properties.setProperty("group.id", "test");
> return new KafkaTableSource(tableSchema, "test-topic1", properties, 
> new MyDeserializationSchema());
> }
> }
>
>
> I get the following error
>
> The program finished with the following exception:
>
> The implementation of the FlinkKafkaConsumerBase is not serializable. The
> object probably contains or references non serializable fields.
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821)
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584)
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529)
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511)
>
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165)
>
> org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82)
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105)
>

Do I need to use Table API or DataStream API to construct sources?

2020-02-29 Thread kant kodali
Hi All,

Do I need to use DataStream API or Table API to construct sources? I am
just trying to read from Kafka and print it to console. And yes I tried it
with datastreams and it works fine but I want to do it using Table related
APIs. I don't see any documentation or a sample on how to create Kafka
table source or any other source using Table Source API's so after some
digging I wrote the following code. My ultimate goal is to avoid Datastream
API as much as possible and just use Table API & SQL but somehow I feel the
Flink framework focuses on DataStream than the SQL interface. am I wrong?
>From the user perspective wouldn't it make more sense to focus on SQL
interfaces for both streaming and batch?


import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.KafkaTableSource;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.types.Row;

import java.io.IOException;
import java.util.Properties;

public class Test {

public class MyDeserializationSchema extends
AbstractDeserializationSchema {
@Override
public Row deserialize(byte[] message) throws IOException {
return Row.of(new String(message));
}
}

public static void main(String... args) throws Exception {
Test test = new Test();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();

StreamExecutionEnvironment streamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(streamExecutionEnvironment, settings);

TableSource tableSource = test.getKafkaTableSource();
Table kafkaTable = tableEnvironment.fromTableSource(tableSource);
tableEnvironment.createTemporaryView("kafka_source", kafkaTable);

Table resultTable = tableEnvironment.sqlQuery("select * from
kafka_source");
tableEnvironment.toAppendStream(resultTable, Row.class).print();

streamExecutionEnvironment.execute("Sample Job");
}

public KafkaTableSource getKafkaTableSource() {
TableSchema tableSchema = TableSchema.builder().field("f0",
DataTypes.STRING()).build();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
return new KafkaTableSource(tableSchema, "test-topic1",
properties, new MyDeserializationSchema());
}
}


I get the following error

The program finished with the following exception:

The implementation of the FlinkKafkaConsumerBase is not serializable. The
object probably contains or references non serializable fields.
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821)
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584)
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529)
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511)
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165)
org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82)
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105)
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)

Re: The main method caused an error: Unable to instantiate java compiler in Flink 1.10

2020-02-28 Thread kant kodali
Hi Jark,

You mean I shouldn't package them into the jar so I need to specify them as
compileOnly as Lake Shen pointed out? because I still need them to use it
in my IDE/compile my application. just tried it and yes it works below is
updated build.gradle

buildscript {
repositories {
jcenter() // this applies only to the Gradle 'Shadow' plugin
}
dependencies {
classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
}
}

plugins {
id 'java'
id 'application'
}

mainClassName = 'Test'
apply plugin: 'com.github.johnrengelman.shadow'

// artifact properties
/*group = 'org.myorg.quickstart'
version = '0.1'*/
description = """Flink Quickstart Job"""

ext {
javaVersion = '1.8'
flinkVersion = '1.10.0'
scalaBinaryVersion = '2.11'
slf4jVersion = '1.7.7'
log4jVersion = '1.2.17'
}


sourceCompatibility = javaVersion
targetCompatibility = javaVersion
tasks.withType(JavaCompile) {
options.encoding = 'UTF-8'
}

applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"]

// declare where to find the dependencies of your project
repositories {
mavenCentral()
maven { url
"https://repository.apache.org/content/repositories/snapshots/; }
}

// NOTE: We cannot use "compileOnly" or "shadow" configurations since
then we could not run code
// in the IDE or with "gradle run". We also cannot exclude transitive
dependencies from the
// shadowJar yet (see https://github.com/johnrengelman/shadow/issues/159).
// -> Explicitly define the // libraries we want to be included in the
"flinkShadowJar" configuration!

configurations {
flinkShadowJar // dependencies which go into the shadowJar

// always exclude these (also from transitive dependencies) since
they are provided by Flink
flinkShadowJar.exclude group: 'org.apache.flink', module: 'force-shading'
flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 'jsr305'
flinkShadowJar.exclude group: 'org.slf4j'
flinkShadowJar.exclude group: 'log4j'
flinkShadowJar.exclude group: 'org.codehaus.janino'
}

// declare the dependencies for your production and test code
dependencies {
// --
// Compile-time dependencies that should NOT be part of the
// shadow jar and are provided in the lib folder of Flink
// --
compile "org.apache.flink:flink-java:${flinkVersion}"
compile 
"org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"

flinkShadowJar "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"
flinkShadowJar "org.apache.flink:flink-table-api-java:${flinkVersion}"
compileOnly
"org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"
flinkShadowJar "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}"
flinkShadowJar
"org.apache.flink:flink-statebackend-rocksdb_2.11:${flinkVersion}"

// --
// Dependencies that should be part of the shadow jar, e.g.
// connectors. These must be in the flinkShadowJar configuration!
// --
//flinkShadowJar
"org.apache.flink:flink-connector-kafka-0.11_${scalaBinaryVersion}:${flinkVersion}"

compile "log4j:log4j:${log4jVersion}"
compile "org.slf4j:slf4j-log4j12:${slf4jVersion}"

// Add test dependencies here.
// testCompile "junit:junit:4.12"
}

// make compileOnly dependencies available for tests:
sourceSets {
main.compileClasspath += configurations.flinkShadowJar
main.runtimeClasspath += configurations.flinkShadowJar

test.compileClasspath += configurations.flinkShadowJar
test.runtimeClasspath += configurations.flinkShadowJar

javadoc.classpath += configurations.flinkShadowJar
}

run.classpath = sourceSets.main.runtimeClasspath

jar {
manifest {
attributes 'Built-By': System.getProperty('user.name'),
'Build-Jdk': System.getProperty('java.version')
}
}

shadowJar {
configurations = [project.configurations.flinkShadowJar]
}


On Fri, Feb 28, 2020 at 1:09 AM Jark Wu  wrote:

> Hi Kant,
>
> You shouldn't compile `flink-table-planner` or `flink-table-planner-blink`
> into your user jar. They have been provided by Flink cluster.
>
> Best,
> Jark
>
> On Fri, 28 Feb 2020 at 15:28, kant kodali  wrote:
>
>> Here is my build.gradle and I am not sure which jar uses
>> org.codehaus.commons.compiler.ICompilerFactory
>>
>> buildscript {
>> repositories {
>> jcenter() // this applies only to the Gradle 'Shadow' plugin
>> }
>> depend

Re: Flink 1.10 exception : Unable to instantiate java compiler

2020-02-27 Thread kant kodali
Same problem!

On Thu, Feb 27, 2020 at 11:10 PM LakeShen  wrote:

> Hi community,
>   now  I am using the flink 1.10 to run the flink task
> ,cluster type is yarn . I use commandline to submit my flink job , the
> commandline just like this :
>
> flink run  -m yarn-cluster  --allowNonRestoredState  -c xxx.xxx.xx
>  flink-stream-xxx.jar
>
> Bug there is a exception to throw,the exception info is :
>
> *org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Unable to instantiate java compiler*
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.lang.IllegalStateException: Unable to instantiate java
> compiler
> at
> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:434)
> at
> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.load3(JaninoRelMetadataProvider.java:375)
> at
> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.lambda$static$0(JaninoRelMetadataProvider.java:109)
> at
> org.apache.flink.calcite.shaded.com.google.common.cache.CacheLoader$FunctionToCacheLoader.load(CacheLoader.java:149)
> at
> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3542)
> at
> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2323)
> at
> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2286)
> at
> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2201)
> at
> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3953)
> at
> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3957)
> at
> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4875)
> at
> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.create(JaninoRelMetadataProvider.java:475)
> at
> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.revise(JaninoRelMetadataProvider.java:488)
> at
> org.apache.calcite.rel.metadata.RelMetadataQuery.revise(RelMetadataQuery.java:193)
> at
> org.apache.calcite.rel.metadata.RelMetadataQuery.getPulledUpPredicates(RelMetadataQuery.java:797)
> at
> org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:298)
> at
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
> at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256)
> at
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
> at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> 

Re: The main method caused an error: Unable to instantiate java compiler in Flink 1.10

2020-02-27 Thread kant kodali
Here is my build.gradle and I am not sure which jar uses
org.codehaus.commons.compiler.ICompilerFactory

buildscript {
repositories {
jcenter() // this applies only to the Gradle 'Shadow' plugin
}
dependencies {
classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
}
}

plugins {
id 'java'
id 'application'
}

mainClassName = 'Test'
apply plugin: 'com.github.johnrengelman.shadow'

// artifact properties
/*group = 'org.myorg.quickstart'
version = '0.1'*/
description = """Flink Quickstart Job"""

ext {
javaVersion = '1.8'
flinkVersion = '1.10.0'
scalaBinaryVersion = '2.11'
slf4jVersion = '1.7.7'
log4jVersion = '1.2.17'
}


sourceCompatibility = javaVersion
targetCompatibility = javaVersion
tasks.withType(JavaCompile) {
options.encoding = 'UTF-8'
}

applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"]

// declare where to find the dependencies of your project
repositories {
mavenCentral()
maven { url
"https://repository.apache.org/content/repositories/snapshots/; }
}

// NOTE: We cannot use "compileOnly" or "shadow" configurations since
then we could not run code
// in the IDE or with "gradle run". We also cannot exclude transitive
dependencies from the
// shadowJar yet (see https://github.com/johnrengelman/shadow/issues/159).
// -> Explicitly define the // libraries we want to be included in the
"flinkShadowJar" configuration!

configurations {
flinkShadowJar // dependencies which go into the shadowJar

// always exclude these (also from transitive dependencies) since
they are provided by Flink
flinkShadowJar.exclude group: 'org.apache.flink', module: 'force-shading'
flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 'jsr305'
flinkShadowJar.exclude group: 'org.slf4j'
flinkShadowJar.exclude group: 'log4j'
}

// declare the dependencies for your production and test code
dependencies {
// --
// Compile-time dependencies that should NOT be part of the
// shadow jar and are provided in the lib folder of Flink
// --
compile "org.apache.flink:flink-java:${flinkVersion}"
compile 
"org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"

flinkShadowJar "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"
flinkShadowJar "org.apache.flink:flink-table-api-java:${flinkVersion}"
flinkShadowJar
"org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"
flinkShadowJar "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}"
flinkShadowJar
"org.apache.flink:flink-statebackend-rocksdb_2.11:${flinkVersion}"

// --
// Dependencies that should be part of the shadow jar, e.g.
// connectors. These must be in the flinkShadowJar configuration!
// --
//flinkShadowJar
"org.apache.flink:flink-connector-kafka-0.11_${scalaBinaryVersion}:${flinkVersion}"

compile "log4j:log4j:${log4jVersion}"
compile "org.slf4j:slf4j-log4j12:${slf4jVersion}"

// Add test dependencies here.
// testCompile "junit:junit:4.12"
}

// make compileOnly dependencies available for tests:
sourceSets {
main.compileClasspath += configurations.flinkShadowJar
main.runtimeClasspath += configurations.flinkShadowJar

test.compileClasspath += configurations.flinkShadowJar
test.runtimeClasspath += configurations.flinkShadowJar

javadoc.classpath += configurations.flinkShadowJar
}

run.classpath = sourceSets.main.runtimeClasspath

jar {
manifest {
attributes 'Built-By': System.getProperty('user.name'),
'Build-Jdk': System.getProperty('java.version')
}
}

shadowJar {
configurations = [project.configurations.flinkShadowJar]
}


On Thu, Feb 27, 2020 at 10:31 PM Jingsong Li  wrote:

> Hi kant,
>
> As Jark said,
> Your user jar should not contains "
> org.codehaus.commons.compiler.ICompilerFactory" dependencies. This will
> make calcite can not work.
>
> In 1.10, have made Flink client respect classloading policy that default
> policy is child first [1]. More details can find in [2].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/release-notes/flink-1.10.html#flink-client-respects-classloading-policy-flink-13749
> [2] https://issues.apache.org/jira/browse/FLINK-13749
>
> Best,
> Jingsong Lee
>
> On Fri, Feb 28, 2020 at 11:19 AM Jark Wu  wrote:
>
>> Hi Kant,
>>
>> Are you depending a custom janino or something like hive-exec in your

Re: The main method caused an error: Unable to instantiate java compiler in Flink 1.10

2020-02-27 Thread kant kodali
It works within IDE but not when I submit using command using flink run
myApp.jar



On Thu, Feb 27, 2020 at 3:32 PM kant kodali  wrote:

> Below is the sample code using Flink 1.10
>
> public class Test {
>
> public static void main(String... args) throws Exception {
>
> EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStateBackend((StateBackend) new 
> RocksDBStateBackend("file:///tmp"));
>
>
> StreamTableEnvironment bsTableEnv = 
> StreamTableEnvironment.create(env, bsSettings);
>
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers", "localhost:9092");
> properties.setProperty("group.id", "test");
>
> FlinkKafkaConsumer consumer1 = new FlinkKafkaConsumer<>(
> java.util.regex.Pattern.compile("test-topic1"),
> new SimpleStringSchema(),
> properties);
>
> FlinkKafkaConsumer consumer2 = new FlinkKafkaConsumer<>(
> java.util.regex.Pattern.compile("test-topic2"),
> new SimpleStringSchema(),
> properties);
>
> DataStream stream1 = env.addSource(consumer1);
> DataStream stream2 = env.addSource(consumer2);
>
> bsTableEnv.createTemporaryView("sample1", stream1);
> bsTableEnv.createTemporaryView("sample2", stream2);
>
> Table result = bsTableEnv.sqlQuery("SELECT * FROM sample1 INNER JOIN 
> sample2 on sample1.f0=sample2.f0");
> result.printSchema();
>
>
> bsTableEnv.toRetractStream(result, Row.class).print();
> bsTableEnv.execute("sample job");
> }
> }
>
>
> On Thu, Feb 27, 2020 at 3:22 PM kant kodali  wrote:
>
>> Fixed the typo.
>>
>>
>> Hi All,
>>
>>
>> My sample program works in Flink 1.9 but in 1.10 I get the following
>> error when I am submitting the job. otherwords it fails to submit a job.
>> any idea?
>>
>>
>> Thanks!
>>
>> On Thu, Feb 27, 2020 at 2:19 PM kant kodali  wrote:
>>
>>> Hi All,
>>>
>>>
>>> My sample program works in Flink 1.9 but in 1.0 I get the following
>>> error when I am submitting the job. otherwords it fails to submit a job.
>>> any idea?
>>>
>>>
>>> Thanks!
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error: Unable to instantiate java compiler
>>>
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>>>
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>>
>>> at
>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>>>
>>> at
>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>>>
>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>>>
>>> at
>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>>>
>>> at
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>>>
>>> at
>>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>>
>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>>>
>>> Caused by: java.lang.IllegalStateException: Unable to instantiate java
>>> compiler
>>>
>>> at
>>> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:434)
>>>
>>> at
>>> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.load3(JaninoRelMetadataProvider.java:375)
>>>
>>> at
>>> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.lambda$static$0(JaninoRelMetadataProvider.java:109)
>>>
>>> at
>>> org.apache.flink.calcite.shaded.com.google.common.cache.CacheLoader$FunctionToCacheLoader.load(CacheLoader.java:149)
>>>
>>> at
>>> org.apache.flink.calcite.shaded.com.go

Re: The main method caused an error: Unable to instantiate java compiler in Flink 1.10

2020-02-27 Thread kant kodali
Below is the sample code using Flink 1.10

public class Test {

public static void main(String... args) throws Exception {

EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend((StateBackend) new
RocksDBStateBackend("file:///tmp"));


StreamTableEnvironment bsTableEnv =
StreamTableEnvironment.create(env, bsSettings);

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

FlinkKafkaConsumer consumer1 = new FlinkKafkaConsumer<>(
java.util.regex.Pattern.compile("test-topic1"),
new SimpleStringSchema(),
properties);

FlinkKafkaConsumer consumer2 = new FlinkKafkaConsumer<>(
java.util.regex.Pattern.compile("test-topic2"),
new SimpleStringSchema(),
properties);

DataStream stream1 = env.addSource(consumer1);
DataStream stream2 = env.addSource(consumer2);

bsTableEnv.createTemporaryView("sample1", stream1);
bsTableEnv.createTemporaryView("sample2", stream2);

Table result = bsTableEnv.sqlQuery("SELECT * FROM sample1
INNER JOIN sample2 on sample1.f0=sample2.f0");
result.printSchema();


bsTableEnv.toRetractStream(result, Row.class).print();
bsTableEnv.execute("sample job");
}
}


On Thu, Feb 27, 2020 at 3:22 PM kant kodali  wrote:

> Fixed the typo.
>
>
> Hi All,
>
>
> My sample program works in Flink 1.9 but in 1.10 I get the following error
> when I am submitting the job. otherwords it fails to submit a job. any idea?
>
>
> Thanks!
>
> On Thu, Feb 27, 2020 at 2:19 PM kant kodali  wrote:
>
>> Hi All,
>>
>>
>> My sample program works in Flink 1.9 but in 1.0 I get the following error
>> when I am submitting the job. otherwords it fails to submit a job. any idea?
>>
>>
>> Thanks!
>>
>>
>>
>>
>>
>>
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: Unable to instantiate java compiler
>>
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>>
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>>
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>>
>> at
>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>>
>> Caused by: java.lang.IllegalStateException: Unable to instantiate java
>> compiler
>>
>> at
>> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:434)
>>
>> at
>> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.load3(JaninoRelMetadataProvider.java:375)
>>
>> at
>> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.lambda$static$0(JaninoRelMetadataProvider.java:109)
>>
>> at
>> org.apache.flink.calcite.shaded.com.google.common.cache.CacheLoader$FunctionToCacheLoader.load(CacheLoader.java:149)
>>
>> at
>> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3542)
>>
>> at
>> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2323)
>>
>> at
>> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2286)
>>
>> at
>> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2201)
>>
>> at
>> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3953)
>>
>> at
>> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3957)
&g

Re: The main method caused an error: Unable to instantiate java compiler in Flink 1.10

2020-02-27 Thread kant kodali
Fixed the typo.


Hi All,


My sample program works in Flink 1.9 but in 1.10 I get the following error
when I am submitting the job. otherwords it fails to submit a job. any idea?


Thanks!

On Thu, Feb 27, 2020 at 2:19 PM kant kodali  wrote:

> Hi All,
>
>
> My sample program works in Flink 1.9 but in 1.0 I get the following error
> when I am submitting the job. otherwords it fails to submit a job. any idea?
>
>
> Thanks!
>
>
>
>
>
>
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Unable to instantiate java compiler
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>
> at
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>
> Caused by: java.lang.IllegalStateException: Unable to instantiate java
> compiler
>
> at
> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:434)
>
> at
> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.load3(JaninoRelMetadataProvider.java:375)
>
> at
> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.lambda$static$0(JaninoRelMetadataProvider.java:109)
>
> at
> org.apache.flink.calcite.shaded.com.google.common.cache.CacheLoader$FunctionToCacheLoader.load(CacheLoader.java:149)
>
> at
> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3542)
>
> at
> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2323)
>
> at
> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2286)
>
> at
> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2201)
>
> at
> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3953)
>
> at
> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3957)
>
> at
> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4875)
>
> at
> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.create(JaninoRelMetadataProvider.java:475)
>
> at
> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.revise(JaninoRelMetadataProvider.java:488)
>
> at
> org.apache.calcite.rel.metadata.RelMetadataQuery.revise(RelMetadataQuery.java:193)
>
> at
> org.apache.calcite.rel.metadata.RelMetadataQuery.getPulledUpPredicates(RelMetadataQuery.java:797)
>
> at
> org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:298)
>
> at
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
>
> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
>
> at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
>
> at
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256)
>
> at
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
>
> at
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
>
> at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(Trav

The main method caused an error: Unable to instantiate java compiler in Flink 1.10

2020-02-27 Thread kant kodali
Hi All,


My sample program works in Flink 1.9 but in 1.0 I get the following error
when I am submitting the job. otherwords it fails to submit a job. any idea?


Thanks!







org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Unable to instantiate java compiler

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)

at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)

at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)

at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)

at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)

at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)

Caused by: java.lang.IllegalStateException: Unable to instantiate java
compiler

at
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:434)

at
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.load3(JaninoRelMetadataProvider.java:375)

at
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.lambda$static$0(JaninoRelMetadataProvider.java:109)

at
org.apache.flink.calcite.shaded.com.google.common.cache.CacheLoader$FunctionToCacheLoader.load(CacheLoader.java:149)

at
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3542)

at
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2323)

at
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2286)

at
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2201)

at
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3953)

at
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3957)

at
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4875)

at
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.create(JaninoRelMetadataProvider.java:475)

at
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.revise(JaninoRelMetadataProvider.java:488)

at
org.apache.calcite.rel.metadata.RelMetadataQuery.revise(RelMetadataQuery.java:193)

at
org.apache.calcite.rel.metadata.RelMetadataQuery.getPulledUpPredicates(RelMetadataQuery.java:797)

at
org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:298)

at
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)

at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)

at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)

at
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256)

at
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)

at
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)

at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)

at
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)

at
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)

at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)

at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)

at
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)

at
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)

at scala.collection.Iterator$class.foreach(Iterator.scala:891)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

at
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)

at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)

at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)

at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)

at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)

at

Re: Does Flink 1.9 support create or replace views syntax in raw sql?

2020-02-27 Thread kant kodali
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/hive_catalog.html#hivecatalog

Can I use the hive catalog to store view definitions in HDFS? I am assuming
the metastore can be anything or does it have to be have MySQL?

On Thu, Feb 27, 2020 at 4:46 AM kant kodali  wrote:

> Hi,
>
> 1) Where does the hive catalog persist view definitions? in mysql? or HDFS?
>
> 2) If the views are not persisted what happens if the application crashes
> and restarted? will it create the view again and safely read the data from
> where it left off?
>
> Thanks!
>
> On Wed, Feb 26, 2020 at 6:47 AM godfrey he  wrote:
>
>> Hi Kant, if you want the store the catalog data in Local Filesystem/HDFS,
>> you can implement a user defined catalog (just need to implement Catalog
>> interface)
>>
>> Bests,
>> Godfrey
>>
>> kant kodali  于2020年2月26日周三 下午12:28写道:
>>
>>> Hi Jingsong,
>>>
>>> Can I store it in Local Filesystem/HDFS?
>>>
>>> Thanks!
>>>
>>> On Mon, Jan 20, 2020 at 6:59 PM Jingsong Li 
>>> wrote:
>>>
>>>> Hi Kant,
>>>>
>>>> If you want your view persisted, you must to dock a catalog like hive
>>>> catalog, it stores views in the metastore with mysql.
>>>> - In 1.10, you can store views in catalog through
>>>> "Catalog.createTable", you can create a "CatalogViewImpl". This is an
>>>> internal API, which is not easy to use.
>>>> - In 1.11, we will introduce create view DDL for "TableEnv.sqlUpdate"
>>>> and "TableEnv.createView". It will be easy to use.
>>>>
>>>> Best,
>>>> Jingsong Lee
>>>>
>>>> On Tue, Jan 21, 2020 at 10:03 AM Jark Wu  wrote:
>>>>
>>>>> Hi Kant,
>>>>>
>>>>> The TableEnv#createTemporaryView and CREATE VIEW in SQL Cli both
>>>>> creates temporary views which is not persisted and will lost after session
>>>>> close.
>>>>> I think the persisted views will be supported in 1.11.
>>>>>
>>>>> Best,
>>>>> Jark
>>>>>
>>>>> 2020年1月20日 18:46,kant kodali  写道:
>>>>>
>>>>> Hi Jingsong,
>>>>>
>>>>> Thanks a lot, I think I can live with
>>>>> TableEnvironment.createTemporaryView in Flink 1.10 (which I am expecting 
>>>>> to
>>>>> be released this month) but are these views persisted somewhere? for
>>>>> example across sessions? or say I stop my application and start again will
>>>>> it work as expected?
>>>>>
>>>>> Thanks!
>>>>>
>>>>>
>>>>> On Mon, Jan 20, 2020 at 1:12 AM Jingsong Li 
>>>>> wrote:
>>>>>
>>>>>> Hi Kant,
>>>>>>
>>>>>> Sorry, 1.10 not support "CREATE VIEW" in raw SQL too. Workaround is:
>>>>>> - Using TableEnvironment.createTemporaryView...
>>>>>> - Or using "create view" and "drop view" in the sql-client.
>>>>>> - Or using hive catalog, in 1.10, we support query catalog views.
>>>>>>
>>>>>> FLIP-71 will be finished  in 1.11 soon.
>>>>>>
>>>>>> Best,
>>>>>> Jingsong Lee
>>>>>>
>>>>>> On Sun, Jan 19, 2020 at 4:10 PM kant kodali 
>>>>>> wrote:
>>>>>>
>>>>>>> I tried the following.
>>>>>>>
>>>>>>> bsTableEnv.sqlUpdate("CREATE VIEW my_view AS SELECT * FROM sample1 FULL 
>>>>>>> OUTER JOIN sample2 on sample1.f0=sample2.f0");
>>>>>>>
>>>>>>> Table result = bsTableEnv.sqlQuery("select * from my_view");
>>>>>>>
>>>>>>> It looks like
>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-71+-+E2E+View+support+in+FLINK+SQL
>>>>>>>  Views
>>>>>>> are not supported. Can I expect them to be supported in Flink 1.10?
>>>>>>>
>>>>>>> Currently, with Spark SQL when the query gets big I break it down
>>>>>>> into views and this is one of the most important features my application
>>>>>>> relies on. is there any workaround for this at the moment?
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> On Sat, Jan 18, 2020 at 6:24 PM kant kodali 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> Does Flink 1.9 support create or replace views syntax in raw SQL?
>>>>>>>> like spark streaming does?
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best, Jingsong Lee
>>>>>>
>>>>>
>>>>>
>>>>
>>>> --
>>>> Best, Jingsong Lee
>>>>
>>>


How can I programmatically set RocksDBStateBackend?

2020-02-27 Thread kant kodali
Hi All,

How can I programmatically set RocksDBStateBackend? I did the following

[image: Screen Shot 2020-02-27 at 4.53.38 AM.png]

env.setStateBackend always shows deprecated. so what is the right way to do
this in flink 1.10?

Thanks!


Re: Does Flink 1.9 support create or replace views syntax in raw sql?

2020-02-27 Thread kant kodali
Hi,

1) Where does the hive catalog persist view definitions? in mysql? or HDFS?

2) If the views are not persisted what happens if the application crashes
and restarted? will it create the view again and safely read the data from
where it left off?

Thanks!

On Wed, Feb 26, 2020 at 6:47 AM godfrey he  wrote:

> Hi Kant, if you want the store the catalog data in Local Filesystem/HDFS,
> you can implement a user defined catalog (just need to implement Catalog
> interface)
>
> Bests,
> Godfrey
>
> kant kodali  于2020年2月26日周三 下午12:28写道:
>
>> Hi Jingsong,
>>
>> Can I store it in Local Filesystem/HDFS?
>>
>> Thanks!
>>
>> On Mon, Jan 20, 2020 at 6:59 PM Jingsong Li 
>> wrote:
>>
>>> Hi Kant,
>>>
>>> If you want your view persisted, you must to dock a catalog like hive
>>> catalog, it stores views in the metastore with mysql.
>>> - In 1.10, you can store views in catalog through "Catalog.createTable",
>>> you can create a "CatalogViewImpl". This is an internal API, which is not
>>> easy to use.
>>> - In 1.11, we will introduce create view DDL for "TableEnv.sqlUpdate"
>>> and "TableEnv.createView". It will be easy to use.
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Tue, Jan 21, 2020 at 10:03 AM Jark Wu  wrote:
>>>
>>>> Hi Kant,
>>>>
>>>> The TableEnv#createTemporaryView and CREATE VIEW in SQL Cli both
>>>> creates temporary views which is not persisted and will lost after session
>>>> close.
>>>> I think the persisted views will be supported in 1.11.
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>> 2020年1月20日 18:46,kant kodali  写道:
>>>>
>>>> Hi Jingsong,
>>>>
>>>> Thanks a lot, I think I can live with
>>>> TableEnvironment.createTemporaryView in Flink 1.10 (which I am expecting to
>>>> be released this month) but are these views persisted somewhere? for
>>>> example across sessions? or say I stop my application and start again will
>>>> it work as expected?
>>>>
>>>> Thanks!
>>>>
>>>>
>>>> On Mon, Jan 20, 2020 at 1:12 AM Jingsong Li 
>>>> wrote:
>>>>
>>>>> Hi Kant,
>>>>>
>>>>> Sorry, 1.10 not support "CREATE VIEW" in raw SQL too. Workaround is:
>>>>> - Using TableEnvironment.createTemporaryView...
>>>>> - Or using "create view" and "drop view" in the sql-client.
>>>>> - Or using hive catalog, in 1.10, we support query catalog views.
>>>>>
>>>>> FLIP-71 will be finished  in 1.11 soon.
>>>>>
>>>>> Best,
>>>>> Jingsong Lee
>>>>>
>>>>> On Sun, Jan 19, 2020 at 4:10 PM kant kodali 
>>>>> wrote:
>>>>>
>>>>>> I tried the following.
>>>>>>
>>>>>> bsTableEnv.sqlUpdate("CREATE VIEW my_view AS SELECT * FROM sample1 FULL 
>>>>>> OUTER JOIN sample2 on sample1.f0=sample2.f0");
>>>>>>
>>>>>> Table result = bsTableEnv.sqlQuery("select * from my_view");
>>>>>>
>>>>>> It looks like
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-71+-+E2E+View+support+in+FLINK+SQL
>>>>>>  Views
>>>>>> are not supported. Can I expect them to be supported in Flink 1.10?
>>>>>>
>>>>>> Currently, with Spark SQL when the query gets big I break it down
>>>>>> into views and this is one of the most important features my application
>>>>>> relies on. is there any workaround for this at the moment?
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> On Sat, Jan 18, 2020 at 6:24 PM kant kodali 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> Does Flink 1.9 support create or replace views syntax in raw SQL?
>>>>>>> like spark streaming does?
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best, Jingsong Lee
>>>>>
>>>>
>>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>


Re: Does Flink 1.9 support create or replace views syntax in raw sql?

2020-02-25 Thread kant kodali
Hi Jingsong,

Can I store it in Local Filesystem/HDFS?

Thanks!

On Mon, Jan 20, 2020 at 6:59 PM Jingsong Li  wrote:

> Hi Kant,
>
> If you want your view persisted, you must to dock a catalog like hive
> catalog, it stores views in the metastore with mysql.
> - In 1.10, you can store views in catalog through "Catalog.createTable",
> you can create a "CatalogViewImpl". This is an internal API, which is not
> easy to use.
> - In 1.11, we will introduce create view DDL for "TableEnv.sqlUpdate"
> and "TableEnv.createView". It will be easy to use.
>
> Best,
> Jingsong Lee
>
> On Tue, Jan 21, 2020 at 10:03 AM Jark Wu  wrote:
>
>> Hi Kant,
>>
>> The TableEnv#createTemporaryView and CREATE VIEW in SQL Cli both creates
>> temporary views which is not persisted and will lost after session close.
>> I think the persisted views will be supported in 1.11.
>>
>> Best,
>> Jark
>>
>> 2020年1月20日 18:46,kant kodali  写道:
>>
>> Hi Jingsong,
>>
>> Thanks a lot, I think I can live with
>> TableEnvironment.createTemporaryView in Flink 1.10 (which I am expecting to
>> be released this month) but are these views persisted somewhere? for
>> example across sessions? or say I stop my application and start again will
>> it work as expected?
>>
>> Thanks!
>>
>>
>> On Mon, Jan 20, 2020 at 1:12 AM Jingsong Li 
>> wrote:
>>
>>> Hi Kant,
>>>
>>> Sorry, 1.10 not support "CREATE VIEW" in raw SQL too. Workaround is:
>>> - Using TableEnvironment.createTemporaryView...
>>> - Or using "create view" and "drop view" in the sql-client.
>>> - Or using hive catalog, in 1.10, we support query catalog views.
>>>
>>> FLIP-71 will be finished  in 1.11 soon.
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Sun, Jan 19, 2020 at 4:10 PM kant kodali  wrote:
>>>
>>>> I tried the following.
>>>>
>>>> bsTableEnv.sqlUpdate("CREATE VIEW my_view AS SELECT * FROM sample1 FULL 
>>>> OUTER JOIN sample2 on sample1.f0=sample2.f0");
>>>>
>>>> Table result = bsTableEnv.sqlQuery("select * from my_view");
>>>>
>>>> It looks like
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-71+-+E2E+View+support+in+FLINK+SQL
>>>>  Views
>>>> are not supported. Can I expect them to be supported in Flink 1.10?
>>>>
>>>> Currently, with Spark SQL when the query gets big I break it down into
>>>> views and this is one of the most important features my application relies
>>>> on. is there any workaround for this at the moment?
>>>>
>>>> Thanks!
>>>>
>>>> On Sat, Jan 18, 2020 at 6:24 PM kant kodali  wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> Does Flink 1.9 support create or replace views syntax in raw SQL? like
>>>>> spark streaming does?
>>>>>
>>>>> Thanks!
>>>>>
>>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>>
>
> --
> Best, Jingsong Lee
>


Re: Can Connected Components run on a streaming dataset using iterate delta?

2020-02-22 Thread kant kodali
Hi,

Thanks for that but Looks like it is already available
https://github.com/vasia/gelly-streaming in streaming but I wonder why this
is not part of Flink? there are no releases either.

Thanks!

On Tue, Feb 18, 2020 at 9:13 AM Yun Gao  wrote:

>Hi Kant,
>
>   As far as I know, I think the current example connected
> components implementation based on DataSet API could not be extended to
> streaming data or incremental batch directly.
>
>   From the algorithm's perspective, if the graph only add edge
> and never remove edge, I think the connected components should be able to
> be updated incrementally when the graph changes: When some edges are added,
> a new search should be started from the sources of the added edges to
> propagate its component ID. This will trigger a new pass of update of the
> following vertices, and the updates continues until no vertices' component
> ID get updated. However, if there are also edge removes, I think the
> incremental computation should not be easily achieved.
>
>   To implement the above logic on Flink, I think currently
> there should be two possible methods:
> 1) Use DataSet API and DataSet iteration, maintains
> the graph structure and the latest computation result in a storage, and
> whenever there are enough changes to the graph, submits a new DataSet job
> to recompute the result. The job should load the edges, the latest
> component id and whether it is the source of the newly added edges for each
> graph vertex, and then start the above incremental computation logic.
> 2) Flink also provide DataStream iteration API[1] that
> enables iterating on the unbounded data. In this case the graph
> modification should be modeled as a datastream, and some operators inside
> the iteration should maintain the graph structure and current component id.
> whenever there are enough changes, it starts a new pass of computation.
>
> Best,
>  Yun
>
> [1] Flink DataStream iteration,
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/datastream_api.html#iterations
>
> --
> From:kant kodali 
> Send Time:2020 Feb. 18 (Tue.) 15:11
> To:user 
> Subject:Can Connected Components run on a streaming dataset using iterate
> delta?
>
> Hi All,
>
> I am wondering if connected components
> 
> can run on a streaming data? or say incremental batch?
>
> I see that with delta iteration not all vertices need to participate at
> every iteration which is great but in my case the graph is evolving over
> time other words new edges are getting added over time. If so, does the
> algorithm needs to run on the entire graph or can it simply run on the new
> batch of edges?
>
> Finally, What is the best way to compute connected components on Graphs
> evolving over time? Should I use streaming or batch or any custom
> incremental approach? Also, the documentation take maxIterations as an
> input. How can I come up with a good number for max iterations? and once I
> come up with a good number for max Iterations is the algorithm guaranteed
> to converge?
>
>
> Thanks,
> Kant
>
>
>


Can Connected Components run on a streaming dataset using iterate delta?

2020-02-17 Thread kant kodali
Hi All,

I am wondering if connected components

can run on a streaming data? or say incremental batch?

I see that with delta iteration not all vertices need to participate at
every iteration which is great but in my case the graph is evolving over
time other words new edges are getting added over time. If so, does the
algorithm needs to run on the entire graph or can it simply run on the new
batch of edges?

Finally, What is the best way to compute connected components on Graphs
evolving over time? Should I use streaming or batch or any custom
incremental approach? Also, the documentation take maxIterations as an
input. How can I come up with a good number for max iterations? and once I
come up with a good number for max Iterations is the algorithm guaranteed
to converge?


Thanks,
Kant


Re: is streaming outer join sending unnecessary traffic?

2020-02-01 Thread kant kodali
Wondering if anyone had a chance to look through this or should I create
the JIRA?



On Wed, Jan 29, 2020 at 6:49 AM Till Rohrmann  wrote:

> Hi Kant,
>
> I am not an expert on Flink's SQL implementation. Hence, I'm pulling in
> Timo and Jark who might help you with your question.
>
> Cheers,
> Till
>
> On Tue, Jan 28, 2020 at 10:46 PM kant kodali  wrote:
>
>> Sorry. fixed some typos.
>>
>> I am doing a streaming outer join from four topics in Kafka lets call
>> them sample1, sample2, sample3, sample4. Each of these test topics has just
>> one column which is of tuple string. my query is this
>>
>> SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 FULL 
>> OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN sample4 on 
>> sample3.f0=sample4.f0
>>
>>
>> And here is how I send messages to those Kafka topics at various times.
>>
>> At time t1 Send a message "flink" to test-topic1
>>
>> (true,flink,null,null,null) // Looks good
>>
>> At time t2 Send a message "flink" to test-topic4
>>
>> (true,null,null,null,flink) // Looks good
>>
>> At time t3 Send a message "flink" to test-topic3
>>
>> (false,null,null,null,flink) // Looks good
>> (true,null,null,flink,flink) //Looks good
>>
>> At time t4 Send a message "flink" to test-topic2
>>
>> (false,flink,null,null,null) // Looks good
>> (false,null,null,flink,flink) // Looks good
>> *(true,null,null,null,flink) // Redundant?*
>> *(false,null,null,null,flink) // Redundant?*
>> (true,flink,flink,flink,flink) //Looks good
>>
>> Assume t1>
>> Those two rows above seem to be redundant to me although the end result
>> is correct. Doesn't see the same behavior if I join two topics. These
>> redundant messages can lead to a lot of database operations underneath so
>> any way to optimize this? I am using Flink 1.9 so not sure if this is
>> already fixed in 1.10.
>>
>> Attached the code as well.
>>
>> Thanks!
>> kant
>>
>>
>> On Tue, Jan 28, 2020 at 1:43 PM kant kodali  wrote:
>>
>>> Hi All,
>>>
>>> I am doing a streaming outer join from four topics in Kafka lets call
>>> them sample1, sample2, sample3, sample4. Each of these test topics has just
>>> one column which is of tuple string. my query is this
>>>
>>> SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 FULL 
>>> OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN sample4 on 
>>> sample3.f0=sample4.f0
>>>
>>>
>>> And here is how I send messages to those Kafka topics at various times.
>>>
>>> At time t1 Send a message "flink" to test-topic1
>>>
>>> (true,flink,null,null,null) // Looks good
>>>
>>> At time t2 Send a message "flink" to test-topic4
>>>
>>> (true,null,null,null,flink) // Looks good
>>>
>>> At time t3 Send a message "flink" to test-topic3
>>>
>>> (false,null,null,null,flink) // Looks good
>>> (true,null,null,flink,flink) //Looks good
>>>
>>> At time t3 Send a message "flink" to test-topic2
>>>
>>> (false,flink,null,null,null) // Looks good
>>> (false,null,null,flink,flink) // Looks good
>>> *(true,null,null,null,flink) // Redundant?*
>>> *(false,null,null,null,flink) // Redundant?*
>>> (true,flink,flink,flink,flink) //Looks good
>>>
>>> Those two rows above seem to be redundant to be although the end result
>>> is correct. Doesn't see the same behavior if I join two topics. This
>>> unwanted message will lead to a lot of database operations underneath so
>>> any way to optimize this? I am using Flink 1.9 so not sure if this is
>>> already fixed in 1.10.
>>>
>>> Attached the code as well.
>>>
>>> Thanks!
>>> kant
>>>
>>>
>>>
>>>


Cypher support for flink graphs?

2020-01-29 Thread kant kodali
Hi All,

Can we expect open cypher support for Flink graphs?

Thanks!


Re: is streaming outer join sending unnecessary traffic?

2020-01-28 Thread kant kodali
Sorry. fixed some typos.

I am doing a streaming outer join from four topics in Kafka lets call them
sample1, sample2, sample3, sample4. Each of these test topics has just one
column which is of tuple string. my query is this

SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0
FULL OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN
sample4 on sample3.f0=sample4.f0


And here is how I send messages to those Kafka topics at various times.

At time t1 Send a message "flink" to test-topic1

(true,flink,null,null,null) // Looks good

At time t2 Send a message "flink" to test-topic4

(true,null,null,null,flink) // Looks good

At time t3 Send a message "flink" to test-topic3

(false,null,null,null,flink) // Looks good
(true,null,null,flink,flink) //Looks good

At time t4 Send a message "flink" to test-topic2

(false,flink,null,null,null) // Looks good
(false,null,null,flink,flink) // Looks good
*(true,null,null,null,flink) // Redundant?*
*(false,null,null,null,flink) // Redundant?*
(true,flink,flink,flink,flink) //Looks good

Assume t1 wrote:

> Hi All,
>
> I am doing a streaming outer join from four topics in Kafka lets call them
> sample1, sample2, sample3, sample4. Each of these test topics has just one
> column which is of tuple string. my query is this
>
> SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 FULL 
> OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN sample4 on 
> sample3.f0=sample4.f0
>
>
> And here is how I send messages to those Kafka topics at various times.
>
> At time t1 Send a message "flink" to test-topic1
>
> (true,flink,null,null,null) // Looks good
>
> At time t2 Send a message "flink" to test-topic4
>
> (true,null,null,null,flink) // Looks good
>
> At time t3 Send a message "flink" to test-topic3
>
> (false,null,null,null,flink) // Looks good
> (true,null,null,flink,flink) //Looks good
>
> At time t3 Send a message "flink" to test-topic2
>
> (false,flink,null,null,null) // Looks good
> (false,null,null,flink,flink) // Looks good
> *(true,null,null,null,flink) // Redundant?*
> *(false,null,null,null,flink) // Redundant?*
> (true,flink,flink,flink,flink) //Looks good
>
> Those two rows above seem to be redundant to be although the end result is
> correct. Doesn't see the same behavior if I join two topics. This unwanted
> message will lead to a lot of database operations underneath so any way to
> optimize this? I am using Flink 1.9 so not sure if this is already fixed in
> 1.10.
>
> Attached the code as well.
>
> Thanks!
> kant
>
>
>
>


is streaming outer join sending unnecessary traffic?

2020-01-28 Thread kant kodali
Hi All,

I am doing a streaming outer join from four topics in Kafka lets call them
sample1, sample2, sample3, sample4. Each of these test topics has just one
column which is of tuple string. my query is this

SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0
FULL OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN
sample4 on sample3.f0=sample4.f0


And here is how I send messages to those Kafka topics at various times.

At time t1 Send a message "flink" to test-topic1

(true,flink,null,null,null) // Looks good

At time t2 Send a message "flink" to test-topic4

(true,null,null,null,flink) // Looks good

At time t3 Send a message "flink" to test-topic3

(false,null,null,null,flink) // Looks good
(true,null,null,flink,flink) //Looks good

At time t3 Send a message "flink" to test-topic2

(false,flink,null,null,null) // Looks good
(false,null,null,flink,flink) // Looks good
*(true,null,null,null,flink) // Redundant?*
*(false,null,null,null,flink) // Redundant?*
(true,flink,flink,flink,flink) //Looks good

Those two rows above seem to be redundant to be although the end result is
correct. Doesn't see the same behavior if I join two topics. This unwanted
message will lead to a lot of database operations underneath so any way to
optimize this? I am using Flink 1.9 so not sure if this is already fixed in
1.10.

Attached the code as well.

Thanks!
kant


Test.java
Description: Binary data


Re: where does flink store the intermediate results of a join and what is the key?

2020-01-28 Thread kant kodali
Hi Arvid,

I am trying to understand your statement. I am new to Flink so excuse me if
I don't know something I should have known. ProcessFunction just process
the records right? If so, how is it better than writing to an external
system? At the end of the day I want to be able to query it (doesn't have
to be through Queryable state and actually I probably don't want to use
Queryable state for its limitations). But ideally I want to be able to
query the intermediate states using SQL and hopefully, the store that is
maintaining the intermediate state has some sort of index support so the
read queries are faster than doing the full scan.

Also, I hear Querying intermediate state just like one would in a database
is a widely requested feature so its a bit surprising that this is not
solved just yet but I am hopeful!

Thanks!



On Tue, Jan 28, 2020 at 12:45 AM Arvid Heise  wrote:

> Hi Kant,
>
> just wanted to mention the obvious. If you add a ProcessFunction right
> after the join, you could maintain a user state with the same result. That
> will of course blow up the data volume by a factor of 2, but may still be
> better than writing to an external system.
>
> On Mon, Jan 27, 2020 at 6:09 PM Benoît Paris <
> benoit.pa...@centraliens-lille.org> wrote:
>
>> Dang what a massive PR: Files changed2,118,  +104,104 −29,161 lines
>> changed.
>> Thanks for the details, Jark!
>>
>> On Mon, Jan 27, 2020 at 4:07 PM Jark Wu  wrote:
>>
>>> Hi Kant,
>>> Having a custom state backend is very difficult and is not recommended.
>>>
>>> Hi Benoît,
>>> Yes, the "Query on the intermediate state is on the roadmap" I
>>> mentioned is referring to integrate Table API & SQL with Queryable State.
>>> We also have an early issue FLINK-6968 to tracks this.
>>>
>>> Best,
>>> Jark
>>>
>>>
>>> On Fri, 24 Jan 2020 at 00:26, Benoît Paris <
>>> benoit.pa...@centraliens-lille.org> wrote:
>>>
>>>> Hi all!
>>>>
>>>> @Jark, out of curiosity, would you be so kind as to expand a bit on "Query
>>>> on the intermediate state is on the roadmap"?
>>>> Are you referring to working on
>>>> QueryableStateStream/QueryableStateClient [1], or around "FOR SYSTEM_TIME
>>>> AS OF" [2], or on other APIs/concepts (is there a FLIP?)?
>>>>
>>>> Cheers
>>>> Ben
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table
>>>>
>>>>
>>>> On Thu, Jan 23, 2020 at 6:40 AM kant kodali  wrote:
>>>>
>>>>> Is it a common practice to have a custom state backend? if so, what
>>>>> would be a popular custom backend?
>>>>>
>>>>> Can I do Elasticseatch as a state backend?
>>>>>
>>>>> Thanks!
>>>>>
>>>>> On Wed, Jan 22, 2020 at 1:42 AM Jark Wu  wrote:
>>>>>
>>>>>> Hi Kant,
>>>>>>
>>>>>> 1) List of row is also sufficient in this case. Using a MapState is
>>>>>> in order to retract a row faster, and save the storage size.
>>>>>>
>>>>>> 2) State Process API is usually used to process save point. I’m
>>>>>> afraid the performance is not good to use it for querying.
>>>>>> On the other side, AFAIK, State Process API requires the uid of
>>>>>> operator. However, uid of operators is not set in Table API & SQL.
>>>>>> So I’m not sure whether it works or not.
>>>>>>
>>>>>> 3)You can have a custom statebackend by
>>>>>> implement org.apache.flink.runtime.state.StateBackend interface, and use 
>>>>>> it
>>>>>> via `env.setStateBackend(…)`.
>>>>>>
>>>>>> Best,
>>>>>> Jark
>>>>>>
>>>>>> On Wed, 22 Jan 2020 at 14:16, kant kodali  wrote:
>>>>>>
>>>>>>> Hi Jark,
>>>>>>>
>>>>>>> 1) shouldn't it be a col1 to List of row? multiple rows can have the
>>>>>>> same joining key right?
>>>>>>>
>>>>>>> 2) Can I use state processor API
>>>>>>> <https://flink.apache.org/feature/2019

Re: where does flink store the intermediate results of a join and what is the key?

2020-01-22 Thread kant kodali
Is it a common practice to have a custom state backend? if so, what would
be a popular custom backend?

Can I do Elasticseatch as a state backend?

Thanks!

On Wed, Jan 22, 2020 at 1:42 AM Jark Wu  wrote:

> Hi Kant,
>
> 1) List of row is also sufficient in this case. Using a MapState is in
> order to retract a row faster, and save the storage size.
>
> 2) State Process API is usually used to process save point. I’m afraid the
> performance is not good to use it for querying.
> On the other side, AFAIK, State Process API requires the uid of
> operator. However, uid of operators is not set in Table API & SQL.
> So I’m not sure whether it works or not.
>
> 3)You can have a custom statebackend by
> implement org.apache.flink.runtime.state.StateBackend interface, and use it
> via `env.setStateBackend(…)`.
>
> Best,
> Jark
>
> On Wed, 22 Jan 2020 at 14:16, kant kodali  wrote:
>
>> Hi Jark,
>>
>> 1) shouldn't it be a col1 to List of row? multiple rows can have the same
>> joining key right?
>>
>> 2) Can I use state processor API
>> <https://flink.apache.org/feature/2019/09/13/state-processor-api.html>
>> from an external application to query the intermediate results in near
>> real-time? I thought querying rocksdb state is a widely requested feature.
>> It would be really great to consider this feature for 1.11
>>
>> 3) Is there any interface where I can implement my own state backend?
>>
>> Thanks!
>>
>>
>> On Tue, Jan 21, 2020 at 6:08 PM Jark Wu  wrote:
>>
>>> Hi Kant,
>>>
>>> 1) Yes, it will be stored in rocksdb statebackend.
>>> 2) In old planner, the left state is the same with right state which are
>>> both `>>`.
>>> It is a 2-level map structure, where the `col1` is the join key, it
>>> is the first-level key of the state. The key of the MapState is the input
>>> row,
>>> and the `count` is the number of this row, the expiredTime indicates
>>> when to cleanup this row (avoid infinite state size). You can find the
>>> source code here[1].
>>> In blink planner, the state structure will be more complex which is
>>> determined by the meta-information of upstream. You can see the source code
>>> of blink planner here [2].
>>> 3) Currently, the intermediate state is not exposed to users. Usually,
>>> users should write the query result to an external system (like Mysql) and
>>> query the external system.
>>> Query on the intermediate state is on the roadmap, but I guess it is
>>> not in 1.11 plan.
>>>
>>> Best,
>>> Jark
>>>
>>> [1]:
>>> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
>>> [2]:
>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45
>>>
>>>
>>> 2020年1月21日 18:01,kant kodali  写道:
>>>
>>> Hi All,
>>>
>>> If I run a query like this
>>>
>>> StreamTableEnvironment.sqlQuery("select * from table1 join table2 on
>>> table1.col1 = table2.col1")
>>>
>>> 1) Where will flink store the intermediate result? Imagine
>>> flink-conf.yaml says state.backend = 'rocksdb'
>>>
>>> 2) If the intermediate results are stored in rockdb then what is the key
>>> and value in this case(given the query above)?
>>>
>>> 3) What is the best way to query these intermediate results from an
>>> external application? while the job is running and while the job is not
>>> running?
>>>
>>> Thanks!
>>>
>>>
>>>


Re: where does flink store the intermediate results of a join and what is the key?

2020-01-21 Thread kant kodali
Hi Jark,

1) shouldn't it be a col1 to List of row? multiple rows can have the same
joining key right?

2) Can I use state processor API
<https://flink.apache.org/feature/2019/09/13/state-processor-api.html> from
an external application to query the intermediate results in near
real-time? I thought querying rocksdb state is a widely requested feature.
It would be really great to consider this feature for 1.11

3) Is there any interface where I can implement my own state backend?

Thanks!


On Tue, Jan 21, 2020 at 6:08 PM Jark Wu  wrote:

> Hi Kant,
>
> 1) Yes, it will be stored in rocksdb statebackend.
> 2) In old planner, the left state is the same with right state which are
> both `>>`.
> It is a 2-level map structure, where the `col1` is the join key, it is
> the first-level key of the state. The key of the MapState is the input row,
> and the `count` is the number of this row, the expiredTime indicates
> when to cleanup this row (avoid infinite state size). You can find the
> source code here[1].
> In blink planner, the state structure will be more complex which is
> determined by the meta-information of upstream. You can see the source code
> of blink planner here [2].
> 3) Currently, the intermediate state is not exposed to users. Usually,
> users should write the query result to an external system (like Mysql) and
> query the external system.
> Query on the intermediate state is on the roadmap, but I guess it is
> not in 1.11 plan.
>
> Best,
> Jark
>
> [1]:
> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
> [2]:
> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45
>
>
> 2020年1月21日 18:01,kant kodali  写道:
>
> Hi All,
>
> If I run a query like this
>
> StreamTableEnvironment.sqlQuery("select * from table1 join table2 on
> table1.col1 = table2.col1")
>
> 1) Where will flink store the intermediate result? Imagine flink-conf.yaml
> says state.backend = 'rocksdb'
>
> 2) If the intermediate results are stored in rockdb then what is the key
> and value in this case(given the query above)?
>
> 3) What is the best way to query these intermediate results from an
> external application? while the job is running and while the job is not
> running?
>
> Thanks!
>
>
>


where does flink store the intermediate results of a join and what is the key?

2020-01-21 Thread kant kodali
Hi All,

If I run a query like this

StreamTableEnvironment.sqlQuery("select * from table1 join table2 on
table1.col1 = table2.col1")

1) Where will flink store the intermediate result? Imagine flink-conf.yaml
says state.backend = 'rocksdb'

2) If the intermediate results are stored in rockdb then what is the key
and value in this case(given the query above)?

3) What is the best way to query these intermediate results from an
external application? while the job is running and while the job is not
running?

Thanks!


Re: Does Flink 1.9 support create or replace views syntax in raw sql?

2020-01-20 Thread kant kodali
Hi Jingsong,

Thanks a lot, I think I can live with TableEnvironment.createTemporaryView
in Flink 1.10 (which I am expecting to be released this month) but are
these views persisted somewhere? for example across sessions? or say I stop
my application and start again will it work as expected?

Thanks!


On Mon, Jan 20, 2020 at 1:12 AM Jingsong Li  wrote:

> Hi Kant,
>
> Sorry, 1.10 not support "CREATE VIEW" in raw SQL too. Workaround is:
> - Using TableEnvironment.createTemporaryView...
> - Or using "create view" and "drop view" in the sql-client.
> - Or using hive catalog, in 1.10, we support query catalog views.
>
> FLIP-71 will be finished  in 1.11 soon.
>
> Best,
> Jingsong Lee
>
> On Sun, Jan 19, 2020 at 4:10 PM kant kodali  wrote:
>
>> I tried the following.
>>
>> bsTableEnv.sqlUpdate("CREATE VIEW my_view AS SELECT * FROM sample1 FULL 
>> OUTER JOIN sample2 on sample1.f0=sample2.f0");
>>
>> Table result = bsTableEnv.sqlQuery("select * from my_view");
>>
>> It looks like
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-71+-+E2E+View+support+in+FLINK+SQL
>>  Views
>> are not supported. Can I expect them to be supported in Flink 1.10?
>>
>> Currently, with Spark SQL when the query gets big I break it down into
>> views and this is one of the most important features my application relies
>> on. is there any workaround for this at the moment?
>>
>> Thanks!
>>
>> On Sat, Jan 18, 2020 at 6:24 PM kant kodali  wrote:
>>
>>> Hi All,
>>>
>>> Does Flink 1.9 support create or replace views syntax in raw SQL? like
>>> spark streaming does?
>>>
>>> Thanks!
>>>
>>
>
> --
> Best, Jingsong Lee
>


Re: Does Flink 1.9 support create or replace views syntax in raw sql?

2020-01-19 Thread kant kodali
 I tried the following.

bsTableEnv.sqlUpdate("CREATE VIEW my_view AS SELECT * FROM sample1
FULL OUTER JOIN sample2 on sample1.f0=sample2.f0");

Table result = bsTableEnv.sqlQuery("select * from my_view");

It looks like
https://cwiki.apache.org/confluence/display/FLINK/FLIP-71+-+E2E+View+support+in+FLINK+SQL
Views
are not supported. Can I expect them to be supported in Flink 1.10?

Currently, with Spark SQL when the query gets big I break it down into
views and this is one of the most important features my application relies
on. is there any workaround for this at the moment?

Thanks!

On Sat, Jan 18, 2020 at 6:24 PM kant kodali  wrote:

> Hi All,
>
> Does Flink 1.9 support create or replace views syntax in raw SQL? like
> spark streaming does?
>
> Thanks!
>


Re: some basic questions

2020-01-18 Thread kant kodali
Hi Godfrey,

I was just clicking the run button on my IDE and it doesn't really show me
errors so I used command line fink run  and that shows me what the
error is. It tells me I need to change to toRetractStream() and both
StreamExecutionEnvrionment and StreamTableEnvrionment .execute seems to
work fine although I am not sure which one is the correct usage.

Thanks!

On Sat, Jan 18, 2020 at 6:52 PM kant kodali  wrote:

> Hi Godfrey,
>
> Thanks a lot for your response. I just tried it with env.execute("simple
> job") but I still get the same error message.
>
> Kant
>
> On Sat, Jan 18, 2020 at 6:26 PM godfrey he  wrote:
>
>> hi kant,
>>
>> > 1) The Documentation says full outer join is supported however the
>> below code just exits with value 1. No error message.
>> if you have converted Table to DataStream, please execute it
>> with StreamExecutionEnvironment ( call env.execute("simple job") )
>>
>> > 2) If I am using a blink planner should I use TableEnvironment or
>> StreamTableEnvironment ?
>> for streaming job, both Environment can be used. the difference is:
>>   TableEnvironment will optimize multiple queries into one DAG when
>> executing, while StreamTableEnvironment will independent optimize each
>> query.
>>   StreamTableEnvironment supports convert from/to DataStream,
>> while TableEnvironment does not support it.
>>   StreamTableEnvironment supports register TableFunction
>> and AggregateFunction, while TableEnvironment does not support it now.
>>
>> for batch job, only TableEnvironment is the only choice, because
>> DataStream does not support batch job now.
>>
>> > 3) Why flink current stable documentation(1.9) recommends (old
>> planner)? any rough timeline on when we would be able to use blink planner
>> in production? perhaps 1.10 or 1.11?
>> 1.9 is blink planner's first version, and it is unstable. In 1.10, blink
>> planner is more statable, we are switching the blink planner to the default
>> step by step [0].
>>
>> [0]
>> http://mail-archives.apache.org/mod_mbox/flink-dev/202001.mbox/%3CCAELO930%2B3RJ5m4hGQ7fbS-CS%3DcfJe5ENcRmZ%3DT_hey-uL6c27g%40mail.gmail.com%3E
>>
>> kant kodali  于2020年1月18日周六 下午5:40写道:
>>
>>> Hi All,
>>>
>>> 1) The Documentation says full outer join is supported however the below
>>> code just exits with value 1. No error message.
>>>
>>> import org.apache.flink.api.common.serialization.SimpleStringSchema;
>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>> import 
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
>>> import org.apache.flink.table.api.*;
>>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>>> import org.apache.flink.types.Row;
>>>
>>> import java.util.Properties;
>>>
>>> public class Test {
>>>
>>> public static void main(String... args) throws Exception {
>>>
>>> EnvironmentSettings bsSettings = 
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>> final StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> StreamTableEnvironment bsTableEnv = 
>>> StreamTableEnvironment.create(env, bsSettings);
>>>
>>> Properties properties = new Properties();
>>> properties.setProperty("bootstrap.servers", "localhost:9092");
>>> properties.setProperty("group.id", "test");
>>>
>>> FlinkKafkaConsumer consumer1 = new FlinkKafkaConsumer<>(
>>> java.util.regex.Pattern.compile("test-topic1"),
>>> new SimpleStringSchema(),
>>> properties);
>>> FlinkKafkaConsumer consumer2 = new FlinkKafkaConsumer<>(
>>> java.util.regex.Pattern.compile("test-topic2"),
>>> new SimpleStringSchema(),
>>> properties);
>>>
>>> DataStream stream1 = env.addSource(consumer1);
>>> DataStream stream2 = env.addSource(consumer2);
>>>
>>> bsTableEnv.registerDataStream("sample1", stream1);
>>> bsTableEnv.registerDataStream("sample2", stream2);
>>>
>>> Table result = bsTableEnv.sqlQuery("SELECT * FROM sample1 FULL 
>>> OUTER JOIN sample2 on sample1.f0=sample2.f0");
>>> result.printSchema();
>>>
>>> bsTableEnv.toAppendStream(result, Row.class).print();
>>> bsTableEnv.execute("sample job");
>>> }
>>> }
>>>
>>>
>>> 2) If I am using a blink planner should I use TableEnvironment or
>>> StreamTableEnvironment ?
>>>
>>> 3) Why flink current stable documentation(1.9) recommends (old planner)?
>>> any rough timeline on when we would be able to use blink planner in
>>> production? perhaps 1.10 or 1.11?
>>>
>>> Thanks!
>>>
>>>
>>>


Re: some basic questions

2020-01-18 Thread kant kodali
Hi Godfrey,

Thanks a lot for your response. I just tried it with env.execute("simple
job") but I still get the same error message.

Kant

On Sat, Jan 18, 2020 at 6:26 PM godfrey he  wrote:

> hi kant,
>
> > 1) The Documentation says full outer join is supported however the below
> code just exits with value 1. No error message.
> if you have converted Table to DataStream, please execute it
> with StreamExecutionEnvironment ( call env.execute("simple job") )
>
> > 2) If I am using a blink planner should I use TableEnvironment or
> StreamTableEnvironment ?
> for streaming job, both Environment can be used. the difference is:
>   TableEnvironment will optimize multiple queries into one DAG when
> executing, while StreamTableEnvironment will independent optimize each
> query.
>   StreamTableEnvironment supports convert from/to DataStream,
> while TableEnvironment does not support it.
>   StreamTableEnvironment supports register TableFunction
> and AggregateFunction, while TableEnvironment does not support it now.
>
> for batch job, only TableEnvironment is the only choice, because
> DataStream does not support batch job now.
>
> > 3) Why flink current stable documentation(1.9) recommends (old planner)?
> any rough timeline on when we would be able to use blink planner in
> production? perhaps 1.10 or 1.11?
> 1.9 is blink planner's first version, and it is unstable. In 1.10, blink
> planner is more statable, we are switching the blink planner to the default
> step by step [0].
>
> [0]
> http://mail-archives.apache.org/mod_mbox/flink-dev/202001.mbox/%3CCAELO930%2B3RJ5m4hGQ7fbS-CS%3DcfJe5ENcRmZ%3DT_hey-uL6c27g%40mail.gmail.com%3E
>
> kant kodali  于2020年1月18日周六 下午5:40写道:
>
>> Hi All,
>>
>> 1) The Documentation says full outer join is supported however the below
>> code just exits with value 1. No error message.
>>
>> import org.apache.flink.api.common.serialization.SimpleStringSchema;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
>> import org.apache.flink.table.api.*;
>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>> import org.apache.flink.types.Row;
>>
>> import java.util.Properties;
>>
>> public class Test {
>>
>> public static void main(String... args) throws Exception {
>>
>> EnvironmentSettings bsSettings = 
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> StreamTableEnvironment bsTableEnv = 
>> StreamTableEnvironment.create(env, bsSettings);
>>
>> Properties properties = new Properties();
>> properties.setProperty("bootstrap.servers", "localhost:9092");
>> properties.setProperty("group.id", "test");
>>
>> FlinkKafkaConsumer consumer1 = new FlinkKafkaConsumer<>(
>> java.util.regex.Pattern.compile("test-topic1"),
>> new SimpleStringSchema(),
>> properties);
>> FlinkKafkaConsumer consumer2 = new FlinkKafkaConsumer<>(
>> java.util.regex.Pattern.compile("test-topic2"),
>> new SimpleStringSchema(),
>> properties);
>>
>> DataStream stream1 = env.addSource(consumer1);
>> DataStream stream2 = env.addSource(consumer2);
>>
>> bsTableEnv.registerDataStream("sample1", stream1);
>> bsTableEnv.registerDataStream("sample2", stream2);
>>
>> Table result = bsTableEnv.sqlQuery("SELECT * FROM sample1 FULL OUTER 
>> JOIN sample2 on sample1.f0=sample2.f0");
>> result.printSchema();
>>
>> bsTableEnv.toAppendStream(result, Row.class).print();
>> bsTableEnv.execute("sample job");
>> }
>> }
>>
>>
>> 2) If I am using a blink planner should I use TableEnvironment or
>> StreamTableEnvironment ?
>>
>> 3) Why flink current stable documentation(1.9) recommends (old planner)?
>> any rough timeline on when we would be able to use blink planner in
>> production? perhaps 1.10 or 1.11?
>>
>> Thanks!
>>
>>
>>


Does Flink 1.9 support create or replace views syntax in raw sql?

2020-01-18 Thread kant kodali
Hi All,

Does Flink 1.9 support create or replace views syntax in raw SQL? like
spark streaming does?

Thanks!


some basic questions

2020-01-18 Thread kant kodali
Hi All,

1) The Documentation says full outer join is supported however the below
code just exits with value 1. No error message.

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.util.Properties;

public class Test {

public static void main(String... args) throws Exception {

EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment bsTableEnv =
StreamTableEnvironment.create(env, bsSettings);

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

FlinkKafkaConsumer consumer1 = new FlinkKafkaConsumer<>(
java.util.regex.Pattern.compile("test-topic1"),
new SimpleStringSchema(),
properties);
FlinkKafkaConsumer consumer2 = new FlinkKafkaConsumer<>(
java.util.regex.Pattern.compile("test-topic2"),
new SimpleStringSchema(),
properties);

DataStream stream1 = env.addSource(consumer1);
DataStream stream2 = env.addSource(consumer2);

bsTableEnv.registerDataStream("sample1", stream1);
bsTableEnv.registerDataStream("sample2", stream2);

Table result = bsTableEnv.sqlQuery("SELECT * FROM sample1 FULL
OUTER JOIN sample2 on sample1.f0=sample2.f0");
result.printSchema();

bsTableEnv.toAppendStream(result, Row.class).print();
bsTableEnv.execute("sample job");
}
}


2) If I am using a blink planner should I use TableEnvironment or
StreamTableEnvironment ?

3) Why flink current stable documentation(1.9) recommends (old planner)?
any rough timeline on when we would be able to use blink planner in
production? perhaps 1.10 or 1.11?

Thanks!


Why would indefinitely growing state an issue for Flink while doing stream to stream joins?

2020-01-16 Thread kant kodali
Hi All,

The doc
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html#regular-joins
says
the following.

"However, this operation has an important implication: it requires to keep
both sides of the join input in Flink’s state forever. Thus, the resource
usage will grow indefinitely as well, if one or both input tables are
continuously growing"

I wonder why this would be an issue especially when the state is stored in
RocksDB which in turn is backed by disk?

I have a use case where I might need to do stream-stream join or some
emulation of that across say 6 or more tables and I don't know for sure how
long I need to keep the state because a row today can join with a row a
year or two years from now. will that be an issue? do I need to think about
designing a solution in another way without using stream-stream join?

Thanks!


Is there a way to clean up the state based on custom/business logic?

2020-01-14 Thread kant kodali
Hi All,

I read through the doc below and I am wondering if I can clean up the state
based on custom logic rather min and max retention time?

For example, I want to say clean up all the state where the key = foo or
say the value = bar. so until the keys reach a particular value just keep
accumulating(retain) and when it does reach a particular value then write
to some sink and clean up! I am looking for something along those lines.
please let me know.

Thanks


https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html


Re: are blink changes merged into flink 1.9?

2020-01-12 Thread kant kodali
Hi,

Thanks for the response! The Documentation says Blink will treat batch as a
special case of streaming. if so, can I stream from a database? or put it
in another way. If I have two streams A & B where Stream A is writing to
Database and Stream B reading from the database. will stream B keeps
running forever picking up all the changes written to the database by
Stream A? If not, is there any to achieve that sort of functionality?

The reason I may want to do something like this is I may want to do join on
two tables that continuously get populated however I cannot specify the
time constraint on the state which is why I would want to write to external
database such that I can treat it as an infinite store and then do join as
the data come in continuously.

Thanks!




On Sat, Jan 11, 2020 at 6:42 PM Benchao Li  wrote:

> Hi kant,
>
> Blink has merged most of it's functionality into 1.9-release, and we have
> two planners [1] in 1.9:
>
>- Blink Planner
>- Old Planner (Legacy Planner)
>
> You can try out blink planner by [2].
> Hope this helps.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/common.html#main-differences-between-the-two-planners
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/common.html#create-a-tableenvironment
>
>
> kant kodali  于2020年1月12日周日 上午7:48写道:
>
>> Hi All,
>>
>> Are blink changes merged into flink 1.9? It looks like there are a lot of
>> features and optimizations in Blink and if they aren't merged into flink
>> 1.9 I am not sure on which one to use? is there any plan towards merging it?
>>
>> Thanks!
>>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>


are blink changes merged into flink 1.9?

2020-01-11 Thread kant kodali
Hi All,

Are blink changes merged into flink 1.9? It looks like there are a lot of
features and optimizations in Blink and if they aren't merged into flink
1.9 I am not sure on which one to use? is there any plan towards merging it?

Thanks!


Re: Are there pipeline API's for ETL?

2020-01-10 Thread kant kodali
Hi Vino,

Another use case would be I want to build a dag of batch sources, sinks and 
transforms and I want to schedule the jobs periodically. One can say similar to 
airflow but a Flink api would be lot better!

Sent from my iPhone

> On Jan 10, 2020, at 6:42 PM, vino yang  wrote:
> 
> 
> Hi kant,
> 
> Can you provide more context about your question? What do you mean about 
> "pipeline API"?
> 
> IMO, you can build an ETL pipeline via composing several Flink transform 
> APIs. About choosing which transform APIs, it depends on your business logic. 
> 
> Here are the generic APIs list.[1]
> 
> Best,
> Vino
> 
> [1]: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/index.html
> 
> kant kodali  于2020年1月11日周六 上午9:06写道:
>> Hi All,
>> 
>> I am wondering if there are pipeline API's for ETL?
>> 
>> Thanks!
>> 
>> 


Re: Are there pipeline API's for ETL?

2020-01-10 Thread kant kodali
Hi Vino,

I am new to Flink. I was thinking more like a dag builder api where I can build 
a dag of source,sink and transforms and hopefully fink take cares of the entire 
life cycle of the dag.

An example would be CDAP pipeline api.

Sent from my iPhone

> On Jan 10, 2020, at 6:42 PM, vino yang  wrote:
> 
> 
> Hi kant,
> 
> Can you provide more context about your question? What do you mean about 
> "pipeline API"?
> 
> IMO, you can build an ETL pipeline via composing several Flink transform 
> APIs. About choosing which transform APIs, it depends on your business logic. 
> 
> Here are the generic APIs list.[1]
> 
> Best,
> Vino
> 
> [1]: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/index.html
> 
> kant kodali  于2020年1月11日周六 上午9:06写道:
>> Hi All,
>> 
>> I am wondering if there are pipeline API's for ETL?
>> 
>> Thanks!
>> 
>> 


Are there pipeline API's for ETL?

2020-01-10 Thread kant kodali
Hi All,

I am wondering if there are pipeline API's for ETL?

Thanks!


Re: How to emit changed data only w/ Flink trigger?

2019-11-01 Thread kant kodali
I am new to Flink so I am not sure if I am giving you the correct answer so
you might want to wait for others to respond. But I think you should do

.inUpsertMode()


On Fri, Nov 1, 2019 at 2:38 AM Qi Kang  wrote:

> Hi all,
>
>
> We have a Flink job which aggregates sales volume and GMV data of each
> site on a daily basis. The code skeleton is shown as follows.
>
>
> ```
> sourceStream
>  .map(message -> JSON.parseObject(message, OrderDetail.class))
>  .keyby("siteId")
>  .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
>  .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
>  .aggregate(new VolumeGmvAggregateFunc());
> ```
>
>
> The window is triggered every second in order to refresh the data
> displayed on a real-time dashboard. Is there some way to output only those
> sites’ data which changed in 1 second period? Currently we’ve got 1000+
> sites, so frequently emitting all aggregation records seems somewhat
> expensive.
>
>
> BR, Qi Kang
>
>
>


is Streaming Ledger open source?

2019-11-01 Thread kant kodali
Hi All,

Is https://github.com/dataArtisans/da-streamingledger an open-source
project? Looks to me that this project is not actively maintained. is that
correct? since the last commit is one year ago and it shows there are 0
contributors?

Thanks!


Re: How to stream intermediate data that is stored in external storage?

2019-10-31 Thread kant kodali
Hi Huyen,

That is not my problem statement.  If it is just ingesting from A to B I am
sure there are enough tutorials for me to get it done. I also feel like the
more I elaborate the more confusing it gets and I am not sure why.

I want to join two streams and I want to see/query the results of that join
in real-time but I also have some constraints.

I come from Spark world and in Spark, if I do an inner join of two streams
A & B  then I can see results only when there are matching rows between A &
B (By definition of inner join this makes sense) but if I do an outer join
of two streams A & B I need to specify the time constraint and only when
the time elapses fully I can see the rows that did not match. This means if
my time constraint is one hour I cannot query the intermediate results
until one hour and this is not the behavior I want. I want to be able to do
all sorts of SQL queries on intermediate results.

I like Flink's idea of externalizing the state that way I don't have to
worry about the memory but I am also trying to avoid writing a separate
microservice that needs to poll and display the intermediate results of the
join in real-time. Instead, I am trying to see if there is a way to treat
that constantly evolving intermediate results as a streaming source, and
maybe do some more transformations and push out to another sink.

Hope that makes sense.

Thanks,
Kant



On Thu, Oct 31, 2019 at 2:43 AM Huyen Levan  wrote:

> Hi Kant,
>
> So your problem statement is "ingest 2 streams into a data warehouse". The
> main component of the solution, from my view, is that SQL server. You can
> have a sink function to insert records in your two streams into two
> different tables (A and B), or upsert into one single table C. That upsert
> action itself serves as a join function, there's no need to join in Flink
> at all.
>
> There are many tools out there can be used for that ingestion. Flink, of
> course, can be used for that purpose. But for me, it's an overkill.
>
> Regards,
> Averell
>
> On Thu, 31 Oct. 2019, 8:19 pm kant kodali,  wrote:
>
>> Hi Averell,
>>
>> yes,  I want to run ad-hoc SQL queries on the joined data as well as data
>> that may join in the future. For example, let's say if you take datasets A
>> and B in streaming mode a row in A can join with a row B in some time in
>> future let's say but meanwhile if I query the intermediate state using SQL
>> I want the row in A that have not yet joined with B to also be available to
>> Query. so not just joined results alone but also data that might be join in
>> the future as well since its all streaming.
>>
>> Thanks!
>>
>


Re: How to stream intermediate data that is stored in external storage?

2019-10-31 Thread kant kodali
Hi Averell,

yes,  I want to run ad-hoc SQL queries on the joined data as well as data
that may join in the future. For example, let's say if you take datasets A
and B in streaming mode a row in A can join with a row B in some time in
future let's say but meanwhile if I query the intermediate state using SQL
I want the row in A that have not yet joined with B to also be available to
Query. so not just joined results alone but also data that might be join in
the future as well since its all streaming.

Thanks!


Are Dynamic tables backed by rocksdb?

2019-10-31 Thread kant kodali
Hi All,

Are Dynamic tables backed by Rocksdb or in memory? if they are backed by
RocksDB can I use SQL to query the state?

Thanks!


Re: How to stream intermediate data that is stored in external storage?

2019-10-31 Thread kant kodali
Hi Averell,

I want to write intermediate results (A join B) incrementally and in
real-time to some external storage so I can query it using SQL.
I am new to Flink so I am trying to find out if 1) such mechanism exists?
2) If not, what are the alternatives?

Thanks

On Thu, Oct 31, 2019 at 1:42 AM Averell  wrote:

> Hi Kant,
>
> I wonder why you need to "source" your intermediate state from files? Why
> not "source" it from the previous operator? I.e. instead of (A join B) ->
> State -> files -> (C), why not do (A join B) -> State -> (files + C)?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: How to stream intermediate data that is stored in external storage?

2019-10-30 Thread kant kodali
"I think you would have to implement your own custom operator that would
output changes to it’s internal state as a side output"

Yes, I am looking for this but I am not sure how to do this? Should I use
the processFunction(like the event-driven applications) ?

On Wed, Oct 30, 2019 at 8:53 AM Piotr Nowojski  wrote:

> Hi Kant,
>
> Checkpointing interval is configurable, but I wouldn’t count on it working
> well with even 10s intervals.
>
> I think what you are this is not supported by Flink generically. Maybe
> Queryable state I mentioned before? But I have never used it.
>
> I think you would have to implement your own custom operator that would
> output changes to it’s internal state as a side output.
>
> Piotrek
>
> On 30 Oct 2019, at 16:14, kant kodali  wrote:
>
> Hi Piotr,
>
> I am talking about the internal state. How often this state gets
> checkpointed? if it is every few seconds then it may not meet our real-time
> requirement(sub second).
>
> The question really is can I read this internal state in a streaming
> fashion in an update mode? The state processor API seems to expose DataSet
> but not DataStream so I am not sure how to read internal state in
> streaming fashion in an update made?
>
> Thanks!
>
> On Wed, Oct 30, 2019 at 7:25 AM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> I’m not sure what are you trying to achieve. What do you mean by “state
>> of full outer join”? The result of it? Or it’s internal state? Also keep in
>> mind, that internal state of the operators in Flink is already
>> snapshoted/written down to an external storage during checkpointing
>> mechanism.
>>
>> The result should be simple, just write it to some Sink.
>>
>> For the internal state, it sounds like you are doing something not the
>> way it was intended… having said that, you can try one of the following
>> options:
>> a) Implement your own outer join operator (might not be as easy if you
>> are using Table API/SQL) and just create a side output for the state
>> changes.
>> b) Use state processor API to read the content of a savepoint/checkpoint
>> [1][2]
>> c) Use queryable state [3] (I’m not sure about this, I have never used
>> queryable state)
>>
>> Piotrek
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>> [2] https://flink.apache.org/feature/2019/09/13/state-processor-api.html
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html
>>
>> On 29 Oct 2019, at 16:42, kant kodali  wrote:
>>
>> Hi All,
>>
>> I want to do a full outer join on two streaming data sources and store
>> the state of full outer join in some external storage like rocksdb or
>> something else. And then want to use this intermediate state as a streaming
>> source again, do some transformation and write it to some external store.
>> is that possible with Flink 1.9?
>>
>> Also what storage systems support push mechanism for the intermediate
>> data? For example, In the use case above does rocksdb support push/emit
>> events in a streaming fashion?
>>
>> Thanks!
>>
>>
>>
>


Re: How to stream intermediate data that is stored in external storage?

2019-10-30 Thread kant kodali
Hi Piotr,

I am talking about the internal state. How often this state gets
checkpointed? if it is every few seconds then it may not meet our real-time
requirement(sub second).

The question really is can I read this internal state in a streaming
fashion in an update mode? The state processor API seems to expose DataSet
but not DataStream so I am not sure how to read internal state in
streaming fashion in an update made?

Thanks!

On Wed, Oct 30, 2019 at 7:25 AM Piotr Nowojski  wrote:

> Hi,
>
> I’m not sure what are you trying to achieve. What do you mean by “state of
> full outer join”? The result of it? Or it’s internal state? Also keep in
> mind, that internal state of the operators in Flink is already
> snapshoted/written down to an external storage during checkpointing
> mechanism.
>
> The result should be simple, just write it to some Sink.
>
> For the internal state, it sounds like you are doing something not the way
> it was intended… having said that, you can try one of the following options:
> a) Implement your own outer join operator (might not be as easy if you are
> using Table API/SQL) and just create a side output for the state changes.
> b) Use state processor API to read the content of a savepoint/checkpoint
> [1][2]
> c) Use queryable state [3] (I’m not sure about this, I have never used
> queryable state)
>
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
> [2] https://flink.apache.org/feature/2019/09/13/state-processor-api.html
> [3]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html
>
> On 29 Oct 2019, at 16:42, kant kodali  wrote:
>
> Hi All,
>
> I want to do a full outer join on two streaming data sources and store the
> state of full outer join in some external storage like rocksdb or something
> else. And then want to use this intermediate state as a streaming source
> again, do some transformation and write it to some external store. is
> that possible with Flink 1.9?
>
> Also what storage systems support push mechanism for the intermediate
> data? For example, In the use case above does rocksdb support push/emit
> events in a streaming fashion?
>
> Thanks!
>
>
>


How to stream intermediate data that is stored in external storage?

2019-10-29 Thread kant kodali
Hi All,

I want to do a full outer join on two streaming data sources and store the
state of full outer join in some external storage like rocksdb or something
else. And then want to use this intermediate state as a streaming source
again, do some transformation and write it to some external store. is
that possible with Flink 1.9?

Also what storage systems support push mechanism for the intermediate data?
For example, In the use case above does rocksdb support push/emit events in
a streaming fashion?

Thanks!


Re: can we do Flink CEP on event stream or batch or both?

2019-06-19 Thread kant kodali
Hi Fabian,

Actually, now that I had gone through my use case I can say that the
equality matches are more like expressions.

for example the *sum(col1, col2) of datasetA = col3 datasetB.*

And these expressions can include, sum, if & else, trim, substring,
absolute_value etc.. and they are submitted by the user in Adhoc fashion.
My job is to apply these expressions on two different streams and identify
the breaks and report.

Any suggestions would be appreciated.

Thanks!



On Tue, Apr 30, 2019 at 2:20 AM Fabian Hueske  wrote:

> Hi,
>
> Stateful streaming applications are typically designed to run continuously
> (i.e., until forever or until they are not needed anymore or replaced).
> May jobs run for weeks or months.
>
> IMO, using CEP for "simple" equality matches would add too much complexity
> for a use case that can be easily solved with a stateful function.
> If your task is to ensure that two streams have the same events, I'd
> recommend to implement a custom DataStream application with a stateful
> ProcessFunction.
> Holding state for two years is certainly possible if you know exactly
> which events to keep, i.e., you do not store the full stream but only those
> few events that have not had a match yet.
>
> If you need to run the same logic also on batch data, you might want to
> check if you can use SQL or the Table API which are designed to work on
> static and streaming data with the same processing semantics.
>
> Best,
> Fabian
>
>
> Am Di., 30. Apr. 2019 um 06:49 Uhr schrieb kant kodali  >:
>
>> Hi All,
>>
>> I have the following questions.
>>
>> 1) can we do Flink CEP on event stream or batch?
>> 2) If we can do streaming I wonder how long can we keep the stream
>> stateful? I also wonder if anyone successfully had done any stateful
>> streaming for days or months(with or without CEP)? or is stateful streaming
>> is mainly to keep state only for a few hours?
>>
>> I have a use case where events are ingested from multiple sources and in
>> theory, the sources are supposed to have the same events however in
>> practice the sources will not have the same events so when the events are
>> ingested from multiple sources the goal is to detect where the "breaks"
>> are(meaning the missing events like exists in one source but not in other)?
>> so I realize this is the typical case for CEP.
>>
>> Also, in this particular use case events that supposed to come 2 years
>> ago can come today and if so, need to update those events also in real time
>> or near real time. Sure there wouldn't be a lot of events that were missed
>> 2 years ago but there will be a few. What would be the best approach?
>>
>> One solution I can think of is to do Stateful CEP with a window of one
>> day or whatever short time period where most events will occur and collect
>> the events that fall beyond that time period(The late ones) into some Kafka
>> topic and have a separate stream analyze the time period of the late ones,
>> construct the corresponding NFA and run through it again.  Please let me
>> know how this sounds or if there is a better way to do it.
>>
>> Thanks!
>>
>>
>>
>>


https://github.com/google/zetasql

2019-05-21 Thread kant kodali
https://github.com/google/zetasql


can we do Flink CEP on event stream or batch or both?

2019-04-29 Thread kant kodali
Hi All,

I have the following questions.

1) can we do Flink CEP on event stream or batch?
2) If we can do streaming I wonder how long can we keep the stream
stateful? I also wonder if anyone successfully had done any stateful
streaming for days or months(with or without CEP)? or is stateful streaming
is mainly to keep state only for a few hours?

I have a use case where events are ingested from multiple sources and in
theory, the sources are supposed to have the same events however in
practice the sources will not have the same events so when the events are
ingested from multiple sources the goal is to detect where the "breaks"
are(meaning the missing events like exists in one source but not in other)?
so I realize this is the typical case for CEP.

Also, in this particular use case events that supposed to come 2 years ago
can come today and if so, need to update those events also in real time or
near real time. Sure there wouldn't be a lot of events that were missed 2
years ago but there will be a few. What would be the best approach?

One solution I can think of is to do Stateful CEP with a window of one day
or whatever short time period where most events will occur and collect the
events that fall beyond that time period(The late ones) into some Kafka
topic and have a separate stream analyze the time period of the late ones,
construct the corresponding NFA and run through it again.  Please let me
know how this sounds or if there is a better way to do it.

Thanks!


Re: status on FLINK-7129

2019-04-27 Thread kant kodali
Based on the comments I get the feeling that this is not trivial or perhaps
there isn't an optimal further. is that correct?

Any ideas on how to get started on this?

On Tue, Apr 23, 2019 at 6:51 AM Hao Sun  wrote:

> +1
>
> On Tue, Apr 23, 2019, 05:18 Vishal Santoshi 
> wrote:
>
>> +1
>>
>> On Tue, Apr 23, 2019, 4:57 AM kant kodali  wrote:
>>
>>> Thanks all for the reply. I believe this is one of the most important
>>> feature that differentiates flink from other stream processing engines as
>>> others don't even have CEP yet. so it would be great if this issue can get
>>> more attention as I don't think anyone want's to restarts the Job every
>>> time they want to detect a new pattern.
>>>
>>> On Mon, Apr 22, 2019 at 11:30 PM Dawid Wysakowicz <
>>> dwysakow...@apache.org> wrote:
>>>
>>>> Hi Kant,
>>>>
>>>> I'm afraid Konstantin is right. Unfortunately AFAIK there is no active
>>>> development on that issue.
>>>>
>>>> Best,
>>>>
>>>> Dawid
>>>> On 22/04/2019 18:20, Konstantin Knauf wrote:
>>>>
>>>> Hi Kant,
>>>>
>>>> as far as I know, no one is currently working on this. Dawid (cc) maybe
>>>> knows more.
>>>>
>>>> Cheers,
>>>>
>>>> Konstantin
>>>>
>>>> On Sat, Apr 20, 2019 at 12:12 PM kant kodali 
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> There seems to be a lot of interest for
>>>>> https://issues.apache.org/jira/browse/FLINK-7129
>>>>>
>>>>> Any rough idea on the status of this issue?
>>>>>
>>>>> Thanks!
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Konstantin Knauf | Solutions Architect
>>>>
>>>> +49 160 91394525
>>>>
>>>> Planned Absences: 17.04.2019 - 26.04.2019
>>>>
>>>> <https://www.ververica.com/>
>>>>
>>>> Follow us @VervericaData
>>>>
>>>> --
>>>>
>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>> Conference
>>>>
>>>> Stream Processing | Event Driven | Real Time
>>>>
>>>> --
>>>>
>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>
>>>> --
>>>> Data Artisans GmbH Registered at Amtsgericht Charlottenburg: HRB
>>>> 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>>>
>>>>


Re: status on FLINK-7129

2019-04-23 Thread kant kodali
Thanks all for the reply. I believe this is one of the most important
feature that differentiates flink from other stream processing engines as
others don't even have CEP yet. so it would be great if this issue can get
more attention as I don't think anyone want's to restarts the Job every
time they want to detect a new pattern.

On Mon, Apr 22, 2019 at 11:30 PM Dawid Wysakowicz 
wrote:

> Hi Kant,
>
> I'm afraid Konstantin is right. Unfortunately AFAIK there is no active
> development on that issue.
>
> Best,
>
> Dawid
> On 22/04/2019 18:20, Konstantin Knauf wrote:
>
> Hi Kant,
>
> as far as I know, no one is currently working on this. Dawid (cc) maybe
> knows more.
>
> Cheers,
>
> Konstantin
>
> On Sat, Apr 20, 2019 at 12:12 PM kant kodali  wrote:
>
>> Hi All,
>>
>> There seems to be a lot of interest for
>> https://issues.apache.org/jira/browse/FLINK-7129
>>
>> Any rough idea on the status of this issue?
>>
>> Thanks!
>>
>
>
> --
>
> Konstantin Knauf | Solutions Architect
>
> +49 160 91394525
>
> Planned Absences: 17.04.2019 - 26.04.2019
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Data Artisans GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B 
> Managing
> Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>
>


status on FLINK-7129

2019-04-20 Thread kant kodali
Hi All,

There seems to be a lot of interest for
https://issues.apache.org/jira/browse/FLINK-7129

Any rough idea on the status of this issue?

Thanks!


Re: Does Flink support stream-stream outer joins in the latest version?

2018-03-07 Thread kant kodali
Hi Cheng,

The docs here
<https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#joins>
states
full outer joins are only available for batch (I am not sure if I am
reading that correctly). I am trying to understand how two unbounded
streams can be joined like a batch? If we have to do batch join then it *must
be* bounded right? If so, how do we bound? I can think Time Window is one
way to bound but other than that if I execute the below join query on the
unbounded stream I am not even sure how that works? A row from one table
can join with a row from another table and that row can come anytime in
future right if it is unbounded. so I am sorry I am failing to understand.


SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.orderId

Thanks!

On Wed, Mar 7, 2018 at 3:49 AM, Hequn Cheng <chenghe...@gmail.com> wrote:

> Hi kant,
>
> It seems that you mean the Time-windowed Join. The Time-windowed Joins are 
> supported
> now. You can check more details with the docs given by Xingcan.
> As for the non-window join, it is used to join two unbounded stream and
> the semantic is very like batch join.
>
> Time-windowed Join:
>
>> SELECT *
>> FROM Orders o, Shipments s
>> WHERE o.id = s.orderId AND
>>   o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
>
>
> Non-windowed Join:
>
>> SELECT *
>> FROM Orders o, Shipments s
>> WHERE o.id = s.orderId
>
>
> On Wed, Mar 7, 2018 at 7:02 PM, kant kodali <kanth...@gmail.com> wrote:
>
>> Hi!
>>
>> Thanks for all this. and yes I was indeed talking about SQL/Table API so
>> I will keep track of these tickets! BTW, What is non-windowed Join? I
>> thought stream-stream-joins by default is a stateful operation so it has to
>> be within some time window right? Also does the output of stream-stream
>> joins emit every time so we can see the state of the join at any given time
>> or only when the watermark elapses and join result fully materializes?
>>
>> On a side note, Full outer join seems to be the most useful for my use
>> case. so the moment its available in master I can start playing and testing
>> it!
>>
>> On Tue, Mar 6, 2018 at 10:39 PM, Hequn Cheng <chenghe...@gmail.com>
>> wrote:
>>
>>> Hi Kant,
>>>
>>> The stream-stream outer joins are work in progress now(left/right/full),
>>> and will probably be ready before the end of this month. You can check the
>>> progress from[1].
>>>
>>> Best, Hequn
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-5878
>>>
>>> On Wed, Mar 7, 2018 at 1:01 PM, Xingcan Cui <xingc...@gmail.com> wrote:
>>>
>>>> Hi Kant,
>>>>
>>>> I suppose you refer to the stream join in SQL/Table API since the outer
>>>> join for windowed-streams can always be achieved with the `JoinFunction` in
>>>> DataStream API.
>>>>
>>>> There are two kinds of stream joins, namely, the time-windowed join and
>>>> the non-windowed join in Flink SQL/Table API [1, 2]. The time-windowed
>>>> outer join has been supported since version 1.5 and the non-windowed outer
>>>> join is still work in progress.
>>>>
>>>> Hope that helps.
>>>>
>>>> Best,
>>>> Xingcan
>>>>
>>>> [1] https://ci.apache.org/projects/flink/flink-docs-master/d
>>>> ev/table/tableApi.html#joins
>>>> [2] https://ci.apache.org/projects/flink/flink-docs-master/d
>>>> ev/table/sql.html#joins
>>>>
>>>>
>>>> On 7 Mar 2018, at 12:45 AM, kant kodali <kanth...@gmail.com> wrote:
>>>>
>>>> Hi All,
>>>>
>>>> Does Flink support stream-stream outer joins in the latest version?
>>>>
>>>> Thanks!
>>>>
>>>>
>>>>
>>>
>>
>


Re: Does Flink support stream-stream outer joins in the latest version?

2018-03-07 Thread kant kodali
Hi!

Thanks for all this. and yes I was indeed talking about SQL/Table API so I
will keep track of these tickets! BTW, What is non-windowed Join? I thought
stream-stream-joins by default is a stateful operation so it has to be
within some time window right? Also does the output of stream-stream joins
emit every time so we can see the state of the join at any given time or
only when the watermark elapses and join result fully materializes?

On a side note, Full outer join seems to be the most useful for my use
case. so the moment its available in master I can start playing and testing
it!

On Tue, Mar 6, 2018 at 10:39 PM, Hequn Cheng <chenghe...@gmail.com> wrote:

> Hi Kant,
>
> The stream-stream outer joins are work in progress now(left/right/full),
> and will probably be ready before the end of this month. You can check the
> progress from[1].
>
> Best, Hequn
>
> [1] https://issues.apache.org/jira/browse/FLINK-5878
>
> On Wed, Mar 7, 2018 at 1:01 PM, Xingcan Cui <xingc...@gmail.com> wrote:
>
>> Hi Kant,
>>
>> I suppose you refer to the stream join in SQL/Table API since the outer
>> join for windowed-streams can always be achieved with the `JoinFunction` in
>> DataStream API.
>>
>> There are two kinds of stream joins, namely, the time-windowed join and
>> the non-windowed join in Flink SQL/Table API [1, 2]. The time-windowed
>> outer join has been supported since version 1.5 and the non-windowed outer
>> join is still work in progress.
>>
>> Hope that helps.
>>
>> Best,
>> Xingcan
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-master/
>> dev/table/tableApi.html#joins
>> [2] https://ci.apache.org/projects/flink/flink-docs-master/
>> dev/table/sql.html#joins
>>
>>
>> On 7 Mar 2018, at 12:45 AM, kant kodali <kanth...@gmail.com> wrote:
>>
>> Hi All,
>>
>> Does Flink support stream-stream outer joins in the latest version?
>>
>> Thanks!
>>
>>
>>
>


Does Flink support stream-stream outer joins in the latest version?

2018-03-06 Thread kant kodali
Hi All,

Does Flink support stream-stream outer joins in the latest version?

Thanks!


Re: Does Queryable State only support K/V queries not SQL?

2018-03-03 Thread kant kodali
Hi Fabian,

Does it make sense to have it on the roadmap? that way external
applications can do ad-hoc queries for the most recent stream data and
avoid writing to any external database?

Thanks!


On Mon, Feb 26, 2018 at 12:14 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi,
>
> Queryable state only supports key point queries, i.e., you can query a
> keyed state for the value of a key.
> Support for SQL is not on the roadmap.
>
> Best, Fabian
>
> 2018-02-25 14:26 GMT+01:00 kant kodali <kanth...@gmail.com>:
>
>> Hi All,
>>
>> 1) Does Queryable State support SQL? By which I mean I can do issue a
>> full-fledged sql query like say ("select * from table where foo='hello'
>> group by name")
>>
>> 2) Does Queryable state support offset and limit? Because if I have a
>> million rows I don't want to get all at once.
>>
>> Sorry if these are naive questions I am new to flink but I have used
>> other streaming processors before.
>>
>> Thanks!
>>
>
>


Re: Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase

2018-02-26 Thread kant kodali
Thanks a lot!

On Mon, Feb 26, 2018 at 9:19 AM, Nico Kruber <n...@data-artisans.com> wrote:

> Judging from the code, you should separate different jars with a colon
> ":", i.e. "—addclasspath jar1:jar2"
>
>
> Nico
>
> On 26/02/18 10:36, kant kodali wrote:
> > Hi Gordon,
> >
> > Thanks for the response!! How do I add multiple jars to the classpaths?
> > Are they separated by a semicolon and still using one flag like
> > "—addclasspath jar1; jar2" or specify the flag multiple times like
> > "—addclasspath jar1 —addclasspath jar2" or specify just the directory
> > "—addclasspath ./opt" so it adds all the jars in that directory!
> >
> > Thanks!
> >
> > On Sun, Feb 25, 2018 at 11:29 PM, Tzu-Li (Gordon) Tai
> > <tzuli...@apache.org <mailto:tzuli...@apache.org>> wrote:
> >
> > Hi,
> >
> > Good to see that you have it working! Yes, each of the Kafka
> > version-specific connectors also have a dependency on the base Kafka
> > connector module.
> >
> > Note that it is usually not recommended to put optional dependencies
> > (such as the connectors) under the lib folder.
> > To add additional dependencies when using the Scala shell, there is
> > a “—addclasspath” option which allows you to specify paths to the
> > dependency jars.
> >
> > Cheers,
> > Gordon
> >
> >
> > On 25 February 2018 at 12:22:28 PM, kant kodali (kanth...@gmail.com
> > <mailto:kanth...@gmail.com>) wrote:
> >
> >> Exception went away after
> >> downloading flink-connector-kafka-base_2.11-1.4.1.jar to lib folder
> >>
> >> On Sat, Feb 24, 2018 at 6:36 PM, kant kodali <kanth...@gmail.com
> >> <mailto:kanth...@gmail.com>> wrote:
> >>
> >> Hi,
> >>
> >> I couldn't get flink and kafka working together. It looks like
> >> all examples I tried from web site fails with the following
> >> Exception.
> >>
> >> Caused by: java.lang.ClassNotFoundException:
> >> org.apache.flink.streaming.connectors.kafka.
> FlinkKafkaConsumerBase
> >>
> >>
> >> *or when I do something like this like it is in the website*
> >>
> >>
> >>  val stream = senv.addSource(new
> >> FlinkKafkaConsumer08[String]("join_test", new
> >> SimpleStringSchema(), properties)).print()
> >>
> >> *I get the following exception*
> >>
> >> :73: error: overloaded method value addSource with
> >> alternatives:
> >>
> >>   [T](function:
> >> org.apache.flink.streaming.api.functions.source.
> SourceFunction.SourceContext[T]
> >> => Unit)(implicit evidence$10:
> >> org.apache.flink.api.common.typeinfo.TypeInformation[T])
> org.apache.flink.streaming.api.scala.DataStream[T]
> >> 
> >>
> >>   [T](function:
> >> org.apache.flink.streaming.api.functions.source.
> SourceFunction[T])(implicit
> >> evidence$9:
> >> org.apache.flink.api.common.typeinfo.TypeInformation[T])
> org.apache.flink.streaming.api.scala.DataStream[T]
> >>
> >>  cannot be applied to (org.apache.flink.streaming.co
> >> <http://org.apache.flink.streaming.co>nnectors.kafka.
> FlinkKafkaConsumer08[String])
> >>
> >>val stream = senv.addSource(new
> >> FlinkKafkaConsumer08[String]("join_test", new
> >> SimpleStringSchema(), properties)).print()
> >>
> >>
> >> can anyone share a simple example of how to get Kafka Stream
> >> as a Table using scala shell? No need for any fancy schema
> >> just needs to print the value. I am using the latest version
> >> of flink 1.41 and my lib folder
> >> containers flink-connector-kafka-0.8_2.11-1.4.1.jar
> >>
> >> I wanted to use Kafka 0.9 but that didn't work so I thought
> >> let me just get something working first and downgraded to 0.8
> >> but 0.8 examples on the website also don't seem to work using
> >> scala shell.
> >>
> >> Thanks!!
> >>
> >>
> >>
> >>
> >
>
>


Re: Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase

2018-02-26 Thread kant kodali
Hi Gordon,

Thanks for the response!! How do I add multiple jars to the classpaths? Are
they separated by a semicolon and still using one flag like "—addclasspath
jar1; jar2" or specify the flag multiple times like "—addclasspath
jar1 —addclasspath
jar2" or specify just the directory "—addclasspath ./opt" so it adds all
the jars in that directory!

Thanks!

On Sun, Feb 25, 2018 at 11:29 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi,
>
> Good to see that you have it working! Yes, each of the Kafka
> version-specific connectors also have a dependency on the base Kafka
> connector module.
>
> Note that it is usually not recommended to put optional dependencies (such
> as the connectors) under the lib folder.
> To add additional dependencies when using the Scala shell, there is a
> “—addclasspath” option which allows you to specify paths to the dependency
> jars.
>
> Cheers,
> Gordon
>
>
> On 25 February 2018 at 12:22:28 PM, kant kodali (kanth...@gmail.com)
> wrote:
>
> Exception went away after downloading 
> flink-connector-kafka-base_2.11-1.4.1.jar
> to lib folder
>
> On Sat, Feb 24, 2018 at 6:36 PM, kant kodali <kanth...@gmail.com> wrote:
>
>> Hi,
>>
>> I couldn't get flink and kafka working together. It looks like all
>> examples I tried from web site fails with the following Exception.
>>
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
>>
>>
>> *or when I do something like this like it is in the website*
>>
>>
>>  val stream = senv.addSource(new FlinkKafkaConsumer08[String]("join_test",
>> new SimpleStringSchema(), properties)).print()
>>
>> *I get the following exception*
>>
>> :73: error: overloaded method value addSource with alternatives:
>>
>>   [T](function: org.apache.flink.streaming.api
>> .functions.source.SourceFunction.SourceContext[T] => Unit)(implicit
>> evidence$10: org.apache.flink.api.common.typeinfo.TypeInformation[T])org.
>> apache.flink.streaming.api.scala.DataStream[T] 
>>
>>   [T](function: org.apache.flink.streaming.api
>> .functions.source.SourceFunction[T])(implicit evidence$9:
>> org.apache.flink.api.common.typeinfo.TypeInformation[T])org.
>> apache.flink.streaming.api.scala.DataStream[T]
>>
>>  cannot be applied to (org.apache.flink.streaming.co
>> nnectors.kafka.FlinkKafkaConsumer08[String])
>>
>>val stream = senv.addSource(new 
>> FlinkKafkaConsumer08[String]("join_test",
>> new SimpleStringSchema(), properties)).print()
>>
>> can anyone share a simple example of how to get Kafka Stream as a Table
>> using scala shell? No need for any fancy schema just needs to print the
>> value. I am using the latest version of flink 1.41 and my lib folder
>> containers flink-connector-kafka-0.8_2.11-1.4.1.jar
>>
>> I wanted to use Kafka 0.9 but that didn't work so I thought let me just
>> get something working first and downgraded to 0.8 but 0.8 examples on the
>> website also don't seem to work using scala shell.
>>
>> Thanks!!
>>
>>
>>
>>
>


Does Queryable State only support K/V queries not SQL?

2018-02-25 Thread kant kodali
Hi All,

1) Does Queryable State support SQL? By which I mean I can do issue a
full-fledged sql query like say ("select * from table where foo='hello'
group by name")

2) Does Queryable state support offset and limit? Because if I have a
million rows I don't want to get all at once.

Sorry if these are naive questions I am new to flink but I have used other
streaming processors before.

Thanks!


Re: Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase

2018-02-24 Thread kant kodali
Exception went away after downloading flink-connector-kafka-base_2.11-1.4.1.jar
to lib folder

On Sat, Feb 24, 2018 at 6:36 PM, kant kodali <kanth...@gmail.com> wrote:

> Hi,
>
> I couldn't get flink and kafka working together. It looks like all
> examples I tried from web site fails with the following Exception.
>
> Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.
> connectors.kafka.FlinkKafkaConsumerBase
>
>
> *or when I do something like this like it is in the website*
>
>
>  val stream = senv.addSource(new FlinkKafkaConsumer08[String]("join_test",
> new SimpleStringSchema(), properties)).print()
>
> *I get the following exception*
>
> :73: error: overloaded method value addSource with alternatives:
>
>   [T](function: org.apache.flink.streaming.api.functions.source.
> SourceFunction.SourceContext[T] => Unit)(implicit evidence$10:
> org.apache.flink.api.common.typeinfo.TypeInformation[T])
> org.apache.flink.streaming.api.scala.DataStream[T] 
>
>   [T](function: 
> org.apache.flink.streaming.api.functions.source.SourceFunction[T])(implicit
> evidence$9: org.apache.flink.api.common.typeinfo.TypeInformation[T])
> org.apache.flink.streaming.api.scala.DataStream[T]
>
>  cannot be applied to (org.apache.flink.streaming.connectors.kafka.
> FlinkKafkaConsumer08[String])
>
>val stream = senv.addSource(new 
> FlinkKafkaConsumer08[String]("join_test",
> new SimpleStringSchema(), properties)).print()
>
> can anyone share a simple example of how to get Kafka Stream as a Table
> using scala shell? No need for any fancy schema just needs to print the
> value. I am using the latest version of flink 1.41 and my lib folder
> containers flink-connector-kafka-0.8_2.11-1.4.1.jar
>
> I wanted to use Kafka 0.9 but that didn't work so I thought let me just
> get something working first and downgraded to 0.8 but 0.8 examples on the
> website also don't seem to work using scala shell.
>
> Thanks!!
>
>
>
>


Re: How to create TableEnvrionment using scala-shell

2018-02-24 Thread kant kodali
Please ignore this. I fixed it by moving opt/flink-table_2.11-1.4.1.jar to
lib/flink-table_2.11-1.4.1.jar

On Sat, Feb 24, 2018 at 4:06 AM, kant kodali <kanth...@gmail.com> wrote:

> Hi All,
>
> I am new to Flink and I am wondering how to create a TableEnvironment in
> scala-shell?  I get an import error below. I am using Flink 1.4.1
>
> 63:error: object table is not a member of package org.apache.flink`
>
>
> I tried to do the following
>
> ./start-scala-shell.sh local
>
> import org.apache.flink.table.api.scala._
>
> import org.apache.flink.table.api.TableEnvironment
>
>
> val tableEnv = TableEnvironment.getTableEnvironment(senv)
>
>
> Thanks!!
>
>
>


Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase

2018-02-24 Thread kant kodali
Hi,

I couldn't get flink and kafka working together. It looks like all examples
I tried from web site fails with the following Exception.

Caused by: java.lang.ClassNotFoundException:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase


*or when I do something like this like it is in the website*


 val stream = senv.addSource(new FlinkKafkaConsumer08[String]("join_test",
new SimpleStringSchema(), properties)).print()

*I get the following exception*

:73: error: overloaded method value addSource with alternatives:

  [T](function:
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext[T]
=> Unit)(implicit evidence$10:
org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T]


  [T](function:
org.apache.flink.streaming.api.functions.source.SourceFunction[T])(implicit
evidence$9:
org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T]

 cannot be applied to
(org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08[String])

   val stream = senv.addSource(new
FlinkKafkaConsumer08[String]("join_test", new SimpleStringSchema(),
properties)).print()

can anyone share a simple example of how to get Kafka Stream as a Table
using scala shell? No need for any fancy schema just needs to print the
value. I am using the latest version of flink 1.41 and my lib folder
containers flink-connector-kafka-0.8_2.11-1.4.1.jar

I wanted to use Kafka 0.9 but that didn't work so I thought let me just get
something working first and downgraded to 0.8 but 0.8 examples on the
website also don't seem to work using scala shell.

Thanks!!


How to create TableEnvrionment using scala-shell

2018-02-24 Thread kant kodali
Hi All,

I am new to Flink and I am wondering how to create a TableEnvironment in
scala-shell?  I get an import error below. I am using Flink 1.4.1

63:error: object table is not a member of package org.apache.flink`


I tried to do the following

./start-scala-shell.sh local

import org.apache.flink.table.api.scala._

import org.apache.flink.table.api.TableEnvironment


val tableEnv = TableEnvironment.getTableEnvironment(senv)


Thanks!!


Re: Does Flink has a JDBC server where I can submit Calcite Streaming Queries?

2017-09-08 Thread kant kodali
Got it! Thanks a lot for your detailed explanation.

On Fri, Sep 8, 2017 at 1:27 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> As I said, there is no such functionality built into Flink yet.
>
> A client program can be parameterized with a query and turned into a SQL
> client that way.
> The submission would work with the regular Flink job client, i.e., it
> would pickup the regular Flink config.
>
> Best, Fabian
>
> 2017-09-08 10:05 GMT+02:00 kant kodali <kanth...@gmail.com>:
>
>> Hi Fabian,
>>
>> Thanks for the response. I understand the common approach is to write a
>> client program and run it however this will not allow me to send queries
>> Ad-hoc so Is there anyway for me to submit Calcite SQL to Flink via REST or
>> whatever mechanism? Forgot even, the result set once I know there is a way
>> to submit Adhoc queries I can figure out a way to write to Kafka.
>>
>> Thanks,
>> kant
>>
>> On Fri, Sep 8, 2017 at 12:52 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi Kant,
>>>
>>> no, there is no such functionality.
>>> I'm also not sure how well streaming would work together with the JDBC
>>> interface. JDBC has not been designed for continuous streaming queries,
>>> i.e., queries that never terminate.
>>> Challenges would be to have an infinite, streamable ResultSet (which
>>> might be possible) and how to represent retractions, i.e., updates of
>>> previously emitted results (I doubt this would work).
>>>
>>> If a streamable ResultSet was possible, a subset of queries (those that
>>> only produce new rows and never have to update emitted results) could be
>>> supported.
>>>
>>> Right now, the approach would be to implement a client program that
>>> executes queries and writes their result to a destination like Kafka or a
>>> database using a TableSink [1].
>>> The community is also discussing a SQL client to submits queries. [2]
>>>
>>> Best, Fabian
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>> dev/table/sourceSinks.html
>>> [2] https://issues.apache.org/jira/browse/FLINK-7594
>>>
>>>
>>> 2017-09-07 21:43 GMT+02:00 kant kodali <kanth...@gmail.com>:
>>>
>>>> Hi All,
>>>>
>>>> Does Flink has a JDBC server where I can submit Calcite Streaming
>>>> Queries? such that I get Stream of responses back from Flink forever via
>>>> JDBC ? What is the standard way to do this?
>>>>
>>>> Thanks,
>>>> Kant
>>>>
>>>
>>>
>>
>


Re: Does Flink has a JDBC server where I can submit Calcite Streaming Queries?

2017-09-08 Thread kant kodali
Hi Fabian,

Thanks for the response. I understand the common approach is to write a
client program and run it however this will not allow me to send queries
Ad-hoc so Is there anyway for me to submit Calcite SQL to Flink via REST or
whatever mechanism? Forgot even, the result set once I know there is a way
to submit Adhoc queries I can figure out a way to write to Kafka.

Thanks,
kant

On Fri, Sep 8, 2017 at 12:52 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Kant,
>
> no, there is no such functionality.
> I'm also not sure how well streaming would work together with the JDBC
> interface. JDBC has not been designed for continuous streaming queries,
> i.e., queries that never terminate.
> Challenges would be to have an infinite, streamable ResultSet (which might
> be possible) and how to represent retractions, i.e., updates of previously
> emitted results (I doubt this would work).
>
> If a streamable ResultSet was possible, a subset of queries (those that
> only produce new rows and never have to update emitted results) could be
> supported.
>
> Right now, the approach would be to implement a client program that
> executes queries and writes their result to a destination like Kafka or a
> database using a TableSink [1].
> The community is also discussing a SQL client to submits queries. [2]
>
> Best, Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/
> sourceSinks.html
> [2] https://issues.apache.org/jira/browse/FLINK-7594
>
>
> 2017-09-07 21:43 GMT+02:00 kant kodali <kanth...@gmail.com>:
>
>> Hi All,
>>
>> Does Flink has a JDBC server where I can submit Calcite Streaming
>> Queries? such that I get Stream of responses back from Flink forever via
>> JDBC ? What is the standard way to do this?
>>
>> Thanks,
>> Kant
>>
>
>


Does Flink has a JDBC server where I can submit Calcite Streaming Queries?

2017-09-07 Thread kant kodali
Hi All,

Does Flink has a JDBC server where I can submit Calcite Streaming Queries?
such that I get Stream of responses back from Flink forever via JDBC ? What
is the standard way to do this?

Thanks,
Kant


Re: can flink do streaming from data sources other than Kafka?

2017-09-07 Thread kant kodali
Yes I can indeed create them but I wonder if that is even possible? I
haven't see any framework doing this as of today. Flink has something
called AsyncDataStream? and I wonder if this can be leveraged to create a
Stream out of Cassandra source?

Thanks!

On Thu, Sep 7, 2017 at 1:16 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Ah, I see. I’m not aware of any existing work / JIRAs on streaming sources
> for Cassandra or HBase, only sinks.
> If you are interested in one, could you open JIRAs for them?
>
>
> On 7 September 2017 at 4:11:05 PM, kant kodali (kanth...@gmail.com) wrote:
>
> Hi Gordon,
>
> Thanks for the response, I did go over the links for sources and sinks
> prior to posting my question. Maybe, I didn't get my question across
> correctly so let me rephrase it. Can I get data out of data stores like
> Cassandra, Hbase in a streaming manner? coz, currently more or less all the
> sources are of message queue family.
>
> Thanks,
> Kant
>
> On Thu, Sep 7, 2017 at 1:04 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
> wrote:
>
>> Hi!
>>
>>
>> I am wondering if Flink can do streaming from data sources other than
>> Kafka. For example can Flink do streaming from a database like Cassandra,
>> HBase, MongoDb to sinks like says Elastic search or Kafka.
>>
>>
>> Yes, Flink currently supports various connectors for different sources
>> and sinks. For an overview you can check out this documentation [1]
>> Apache Bahir [2] also maintains some Flink connectors and is released
>> separately.
>>
>> Also for out of core stateful streaming. Is RocksDB the only option?
>>
>> Currently, RocksDB is the only option for out-of-core state. There was
>> some previous discussion for a Cassandra state backend, though [3].
>>
>> - Gordon
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-
>> 1.3/dev/connectors/index.html
>> [2] http://bahir.apache.org/
>> [3] https://issues.apache.org/jira/browse/FLINK-4266
>>
>> On 7 September 2017 at 2:58:38 PM, kant kodali (kanth...@gmail.com)
>> wrote:
>>
>> Hi All,
>>
>> I am wondering if Flink can do streaming from data sources other than
>> Kafka. For example can Flink do streaming from a database like Cassandra,
>> HBase, MongoDb to sinks like says Elastic search or Kafka.
>>
>> Also for out of core stateful streaming. Is RocksDB the only option? Can
>> I use some other key value store that has SQL interface (since RocksDB
>> doesn't)?
>>
>> Thanks,
>> kant
>>
>>
>


Re: can flink do streaming from data sources other than Kafka?

2017-09-07 Thread kant kodali
Hi Gordon,

Thanks for the response, I did go over the links for sources and sinks
prior to posting my question. Maybe, I didn't get my question across
correctly so let me rephrase it. Can I get data out of data stores like
Cassandra, Hbase in a streaming manner? coz, currently more or less all the
sources are of message queue family.

Thanks,
Kant

On Thu, Sep 7, 2017 at 1:04 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi!
>
>
> I am wondering if Flink can do streaming from data sources other than
> Kafka. For example can Flink do streaming from a database like Cassandra,
> HBase, MongoDb to sinks like says Elastic search or Kafka.
>
>
> Yes, Flink currently supports various connectors for different sources and
> sinks. For an overview you can check out this documentation [1]
> Apache Bahir [2] also maintains some Flink connectors and is released
> separately.
>
> Also for out of core stateful streaming. Is RocksDB the only option?
>
> Currently, RocksDB is the only option for out-of-core state. There was
> some previous discussion for a Cassandra state backend, though [3].
>
> - Gordon
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/connectors/index.html
> [2] http://bahir.apache.org/
> [3] https://issues.apache.org/jira/browse/FLINK-4266
>
> On 7 September 2017 at 2:58:38 PM, kant kodali (kanth...@gmail.com) wrote:
>
> Hi All,
>
> I am wondering if Flink can do streaming from data sources other than
> Kafka. For example can Flink do streaming from a database like Cassandra,
> HBase, MongoDb to sinks like says Elastic search or Kafka.
>
> Also for out of core stateful streaming. Is RocksDB the only option? Can I
> use some other key value store that has SQL interface (since RocksDB
> doesn't)?
>
> Thanks,
> kant
>
>


can flink do streaming from data sources other than Kafka?

2017-09-07 Thread kant kodali
Hi All,

I am wondering if Flink can do streaming from data sources other than
Kafka. For example can Flink do streaming from a database like Cassandra,
HBase, MongoDb to sinks like says Elastic search or Kafka.

Also for out of core stateful streaming. Is RocksDB the only option? Can I
use some other key value store that has SQL interface (since RocksDB
doesn't)?

Thanks,
kant


Comparsion between Flink vs Kafka Stream Processing

2017-04-11 Thread kant kodali
Hi All,

I have simple question. Here is a article

that
addresses the differences between Flink vs Kafka Streaming (in fact there
is a table if you scroll down). While I understand those are the
differences I hardly find it being any useful because if a unit work can be
done in one way or the other then one can never be certain why they picked
a certain way so my question really is and it would be really helpful to
know what is possible with Flink that is not possible with Kafka Streaming ?

Thanks!


Hi

2017-04-07 Thread kant kodali
Hi All,

I read the docs however I still have the following question For Stateful
stream processing is HDFS mandatory? because In some places I see it is
required and other places I see that rocksDB can be used. I just want to
know if HDFS is mandatory for Stateful stream processing?

Thanks!


Re: Can we do batch writes on cassandra using flink while leveraging the locality?

2016-10-28 Thread kant kodali
Spark Cassandra connector does it! but I don't think it really implements a
custom partitioner I think it just leverages token aware policy and does
batch writes by default within a partition but you can also do across
partitions with the same replica!

On Thu, Oct 27, 2016 at 8:41 AM, Shannon Carey <sca...@expedia.com> wrote:

> It certainly seems possible to write a Partitioner that does what you
> describe. I started implementing one but didn't have time to finish it. I
> think the main difficulty is in properly dealing with partition ownership
> changes in Cassandra… if you are maintaining state in Flink and the
> partitioning changes, your job might produce inaccurate output. If, on the
> other hand, you are only using the partitioner just before the output,
> dynamic partitioning changes might be ok.
>
>
> From: kant kodali <kanth...@gmail.com>
> Date: Thursday, October 27, 2016 at 3:17 AM
> To: <user@flink.apache.org>
> Subject: Can we do batch writes on cassandra using flink while leveraging
> the locality?
>
> locality? For example the batch writes in Cassandra will put pressure on
> the coordinator but since the connectors are built by leveraging the
> locality I was wondering if we could do batch of writes on a node where the
> batch belongs?
>


Can we do batch writes on cassandra using flink while leveraging the locality?

2016-10-27 Thread kant kodali
Can we do batch writes on Cassandra using Flink while leveraging the
locality? For example the batch writes in Cassandra will put pressure on
the coordinator but since the connectors are built by leveraging the
locality I was wondering if we could do batch of writes on a node where the
batch belongs?