Re: Filtering lines in parquet

2021-03-12 Thread Avi Levi
Cool, thanks!

On Fri, Mar 12, 2021, 13:15 Arvid Heise  wrote:

> Hi Avi,
>
> thanks for clarifying.
>
> It seems like it's not possible to parse Parquet in Flink without knowing
> the schema. What i'd do is to parse the metadata while setting up the job
> and then pass it to the input format:
>
> ParquetMetadata parquetMetadata = MetadataReader.readFooter(inputStream, 
> path, fileSize);FileMetaData fileMetaData = 
> parquetMetadata.getFileMetaData();MessageType fileSchema = 
> fileMetaData.getSchema 
> <https://www.codota.com/code/java/methods/parquet.hadoop.metadata.FileMetaData/getSchema>();
>
> Quite possibly that's what Spark is doing under hood. If you open a ticket
> with a feature request, we will add it in the future.
>
> On Thu, Mar 11, 2021 at 6:26 PM Avi Levi  wrote:
>
>> Hi Arvid,
>> assuming that I have A0,B0,C0 parquet files with different schema and a
>> common field *ID*, I want to write them to A1,B2,C3 files respectively.
>> My problem is that in my code I do not want to know the full schema just by
>> filtering using the ID field and writing the unfiltered lines to the
>> destination file. each source file should have a matching destination file
>> I tried to implement it using the ParquetInputFormat but I need to define
>> the schema in advance (MessageType) .
>>
>> class ParquetInput(path: Path,  messageType: MessageType) extends 
>> ParquetInputFormat[Row](path, messageType){
>>
>> I am looking for a way that my code will be agnostic to the schema and
>> will only know the "ID" field (just like in spark) e.g *val filtered =
>> rawsDF.filter(col("id") != "123")*
>>
>> Thanks
>> Avi
>>
>> On Thu, Mar 11, 2021 at 2:53 PM Arvid Heise  wrote:
>>
>>> Hi Avi,
>>>
>>> I'm not entirely sure I understand the question. Let's say you have
>>> source A, B, C all with different schema but all have an id. You could use
>>> the ParquetMapInputFormat that provides a map of the records and just use a
>>> map-lookup.
>>>
>>> However, I'm not sure how you want to write these records with different
>>> schema into the same parquet file. Maybe, you just want to extract the
>>> common fields of A, B, C? Then you can also use Table API and just declare
>>> the fields that are common.
>>>
>>> Or do you have sink A, B, C and actually 3 separate topologies?
>>>
>>> On Wed, Mar 10, 2021 at 10:50 AM Avi Levi  wrote:
>>>
>>>> Hi all,
>>>> I am trying to filter lines from parquet files, the problem is that
>>>> they have different schemas, however the field that I am using to filter
>>>> exists in all schemas.
>>>> in spark this is quite straight forward :
>>>>
>>>> *val filtered = rawsDF.filter(col("id") != "123")*
>>>>
>>>> I tried to do it in flink by extending the ParquetInputFormat but in
>>>> this case I need to schema (message type) and implement Convert method
>>>> which I want to avoid since I do not want to convert the line (I want to
>>>> write is as is to other parquet file)
>>>>
>>>> Any ideas ?
>>>>
>>>> Cheers
>>>> Avi
>>>>
>>>>


Re: Filtering lines in parquet

2021-03-11 Thread Avi Levi
Hi Arvid,
assuming that I have A0,B0,C0 parquet files with different schema and a
common field *ID*, I want to write them to A1,B2,C3 files respectively. My
problem is that in my code I do not want to know the full schema just by
filtering using the ID field and writing the unfiltered lines to the
destination file. each source file should have a matching destination file
I tried to implement it using the ParquetInputFormat but I need to define
the schema in advance (MessageType) .

class ParquetInput(path: Path,  messageType: MessageType) extends
ParquetInputFormat[Row](path, messageType){

I am looking for a way that my code will be agnostic to the schema and will
only know the "ID" field (just like in spark) e.g *val filtered =
rawsDF.filter(col("id") != "123")*

Thanks
Avi

On Thu, Mar 11, 2021 at 2:53 PM Arvid Heise  wrote:

> Hi Avi,
>
> I'm not entirely sure I understand the question. Let's say you have source
> A, B, C all with different schema but all have an id. You could use the
> ParquetMapInputFormat that provides a map of the records and just use a
> map-lookup.
>
> However, I'm not sure how you want to write these records with different
> schema into the same parquet file. Maybe, you just want to extract the
> common fields of A, B, C? Then you can also use Table API and just declare
> the fields that are common.
>
> Or do you have sink A, B, C and actually 3 separate topologies?
>
> On Wed, Mar 10, 2021 at 10:50 AM Avi Levi  wrote:
>
>> Hi all,
>> I am trying to filter lines from parquet files, the problem is that they
>> have different schemas, however the field that I am using to filter
>> exists in all schemas.
>> in spark this is quite straight forward :
>>
>> *val filtered = rawsDF.filter(col("id") != "123")*
>>
>> I tried to do it in flink by extending the ParquetInputFormat but in this
>> case I need to schema (message type) and implement Convert method which I
>> want to avoid since I do not want to convert the line (I want to write is
>> as is to other parquet file)
>>
>> Any ideas ?
>>
>> Cheers
>> Avi
>>
>>


Filtering lines in parquet

2021-03-10 Thread Avi Levi
Hi all,
I am trying to filter lines from parquet files, the problem is that they
have different schemas, however the field that I am using to filter
exists in all schemas.
in spark this is quite straight forward :

*val filtered = rawsDF.filter(col("id") != "123")*

I tried to do it in flink by extending the ParquetInputFormat but in this
case I need to schema (message type) and implement Convert method which I
want to avoid since I do not want to convert the line (I want to write is
as is to other parquet file)

Any ideas ?

Cheers
Avi


Re: reading file from s3

2021-03-07 Thread Avi Levi
Thanks Tamir,
I was having some issues connecting from my IDE (solved) but this is really
helpful.


On Sat, Mar 6, 2021, 23:04 Tamir Sagi  wrote:

> I had a typo in my previous answer, the env name was missing an 'S'
>
> ENABLE_BUILT_IN_PLUGIN --> ENABLE_BUILT_IN_PLUGIN*S*
> once again, the value is *the plugin jar name*
> : flink-s3-fs-hadoop-.jar
> The complete list can be found here
> <https://mvnrepository.com/artifact/org.apache.flink/flink-s3-fs-hadoop>
>
> You can Build your own Flink image and set an Environment variable in it
> or once you run the container.
> If you execute it locally(not in a container) in a standalone cluster,
> make sure this env is defined in system level.
>
> Tamir.
> --
> *From:* Tamir Sagi 
> *Sent:* Saturday, March 6, 2021 7:33 PM
> *To:* Avi Levi ; Chesnay Schepler 
> *Cc:* user@flink.apache.org 
> *Subject:* [SUSPECTED FRAUD]Re: reading file from s3
>
> Hey Avi,
>
> Do you use 'Hadoop S3 plugin' to read from S3?
>
> If yes, what is its version?
>
> If not try to read from S3 as follow (ref
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/filesystems/s3.html#amazon-s3>
> )
>
>1. set an environment variable to use hadoop plugin (it's part of
>Flink image):
>key = ENABLE_BUILT_IN_PLUGIN
>value = flink-s3-fs-hadoop-.jar
>(i.e flink-s3-fs-hadoop-1.11.1.jar,  for Flink 1.11.1)
>2. read the file from S3:
>*DataSource lines = env.readTextFile("s3://");*
>
> Tamir
> --
> *From:* Avi Levi 
> *Sent:* Saturday, March 6, 2021 6:59 AM
> *To:* Chesnay Schepler 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: reading file from s3
>
>
> *EXTERNAL EMAIL*
>
>
> Does anyone by any chance have a working example (of course without the
> credentials etc') that can be shared on github ?simply reading/writing a
> file from/to s3.
> I keep on struggling with this one and getting weird exceptions
> Thanks
>
> On Thu, Mar 4, 2021 at 7:30 PM Avi Levi  wrote:
>
> Sure, This is the full exception stacktrace:
>
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> at
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
> at
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2137)
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:238)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2137)
> at
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
> at akka.dispatch.OnComplete.internal(Future.scala:264)
> at akka.dispatch.OnComplete.internal(Future.scala:261)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> at
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
> at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
> at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
> at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
> at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
> at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:531)
> at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
> at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> at
> akka.dispatch.Batchi

Re: reading file from s3

2021-03-05 Thread Avi Levi
Does anyone by any chance have a working example (of course without the
credentials etc') that can be shared on github ?simply reading/writing a
file from/to s3.
I keep on struggling with this one and getting weird exceptions
Thanks

On Thu, Mar 4, 2021 at 7:30 PM Avi Levi  wrote:

> Sure, This is the full exception stacktrace:
>
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> at
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
> at
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2137)
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:238)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2137)
> at
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
> at akka.dispatch.OnComplete.internal(Future.scala:264)
> at akka.dispatch.OnComplete.internal(Future.scala:261)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> at
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
> at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
> at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
> at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
> at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
> at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:531)
> at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
> at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> at
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed
> by NoRestartBackoffTimeStrategy
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665)
> at
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
> at
> org.apache

Re: reading file from s3

2021-03-04 Thread Avi Levi
r.java:159)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 more
Caused by: java.lang.NumberFormatException: For input string: "64M"
at
java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:68)
at java.base/java.lang.Long.parseLong(Long.java:707)
at java.base/java.lang.Long.parseLong(Long.java:832)
at org.apache.hadoop.conf.Configuration.getLong(Configuration.java:1563)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:248)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3414)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:158)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3474)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3442)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:524)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
at
org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:38)
at com.neura.ParquetSourceFunction.run(Job.scala:45)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)

On Thu, Mar 4, 2021 at 6:02 PM Chesnay Schepler  wrote:

> Can you show us the full exception stacktrace? Intuitively I would think
> your cluster configuration contains an invalid value for some memory
> configuration option.
>
> On 3/4/2021 4:45 PM, Avi Levi wrote:
>
> Hi ,
> I am pretty new. I am keep on struggling to read a file from s3 but
> getting this weird exception :
> Caused by: java.lang.NumberFormatException: For input string: "64M" (if
> anyone can link me to a working github example that will be awesome) . what
> am i doing wrong?
>
> This is how my code looks like this :
>
> import org.apache.flink.api.scala.createTypeInformationimport 
> org.apache.flink.streaming.api.functions.source.SourceFunctionimport 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport 
> org.apache.hadoop.conf.Configurationimport org.apache.hadoop.fs.Pathimport 
> org.apache.parquet.column.page.PageReadStoreimport 
> org.apache.parquet.example.data.simple.convert.GroupRecordConverterimport 
> org.apache.parquet.hadoop.ParquetFileReaderimport 
> org.apache.parquet.hadoop.util.HadoopInputFileimport 
> org.apache.parquet.io.ColumnIOFactory
> class ParquetSourceFunction extends SourceFunction[String]{
>   override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
> val inputPath = "s3a://foo/year=2000/month=02/"val conf = new 
> Configuration()
> conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
> val hadoopFile = HadoopInputFile.fromPath(new Path(inputPath), conf)
> val readFooter = ParquetFileReader.open(hadoopFile)
> val metadata = readFooter.getFileMetaData
> val schema = metadata.getSchema
> val parquetFileReader = new ParquetFileReader(conf, metadata, new 
> Path(inputPath), readFooter.getRowGroups, schema.getColumns)var pages: 
> PageReadStore = nulltry {
>   while ({ pages = parquetFileReader.readNextRowGroup; pages != null }) {
> val rows = pages.getRowCount
> val columnIO = new ColumnIOFactory().getColumnIO(schema)
> val recordReader = columnIO.getRecordReader(pages, new 
> GroupRecordConverter(schema))
> (0L until rows).foreach { _ =>
>   val group = recordReader.read()
>   val myString = group.getString("field_name", 0)
>   ctx.collect(myString)
> }
>   }
> }
>   }
>
>   override def cancel(): Unit = ???}
> object Job {
>   def main(args: Array[String]): Unit = {
> // set up the execution environmentlazy val env = 
> StreamExecutionEnviron

reading file from s3

2021-03-04 Thread Avi Levi
Hi ,
I am pretty new. I am keep on struggling to read a file from s3 but getting
this weird exception :
Caused by: java.lang.NumberFormatException: For input string: "64M" (if
anyone can link me to a working github example that will be awesome) . what
am i doing wrong?

This is how my code looks like this :

import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.column.page.PageReadStore
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.io.ColumnIOFactory

class ParquetSourceFunction extends SourceFunction[String]{
  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
val inputPath = "s3a://foo/year=2000/month=02/"
val conf = new Configuration()
conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
val hadoopFile = HadoopInputFile.fromPath(new Path(inputPath), conf)
val readFooter = ParquetFileReader.open(hadoopFile)
val metadata = readFooter.getFileMetaData
val schema = metadata.getSchema
val parquetFileReader = new ParquetFileReader(conf, metadata, new
Path(inputPath), readFooter.getRowGroups, schema.getColumns)
var pages: PageReadStore = null

try {
  while ({ pages = parquetFileReader.readNextRowGroup; pages != null }) {
val rows = pages.getRowCount
val columnIO = new ColumnIOFactory().getColumnIO(schema)
val recordReader = columnIO.getRecordReader(pages, new
GroupRecordConverter(schema))
(0L until rows).foreach { _ =>
  val group = recordReader.read()
  val myString = group.getString("field_name", 0)
  ctx.collect(myString)
}
  }
}
  }

  override def cancel(): Unit = ???
}

object Job {
  def main(args: Array[String]): Unit = {
// set up the execution environment
lazy val env = StreamExecutionEnvironment.getExecutionEnvironment


lazy val stream = env.addSource(new ParquetSourceFunction)
stream.print()
env.execute()
  }
}

sbt dependencies :


ThisBuild / scalaVersion := "2.12.1"

val flinkVersion = "1.12.1"
val awsSdkVersion = "1.7.4"
val hadoopVersion = "2.7.3"

val flinkDependencies = Seq(
  "org.apache.flink" %% "flink-clients" % flinkVersion,// % "provided",
  "org.apache.flink" %% "flink-scala" % flinkVersion,// % "provided",
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion, // % "provided")
  "org.apache.flink" %% "flink-parquet" % flinkVersion,
  "org.apache.flink" %% "flink-hadoop-compatibility" % flinkVersion)

val s3Dependencies = Seq(
  ("com.amazonaws" % "aws-java-sdk" % awsSdkVersion),
  ("org.apache.hadoop" % "hadoop-aws" % hadoopVersion)
)

val serializationDependencies = Seq(
  ("org.apache.avro" % "avro" % "1.7.7"),
  ("org.apache.avro" % "avro-mapred" % "1.7.7").classifier("hadoop2"),
  ("org.apache.parquet" % "parquet-avro" % "1.8.1"))

lazy val root = (project in file(".")).
  settings(
libraryDependencies ++= flinkDependencies,
libraryDependencies ++= s3Dependencies,
libraryDependencies ++= serializationDependencies,
libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "3.3.0" ,
libraryDependencies += "org.apache.parquet" % "parquet-hadoop" % "1.11.1",
libraryDependencies += "org.apache.flink" %%
"flink-table-planner-blink" % "1.12.1" //% "provided"
  )


Getting Exception in thread "main" java.util.concurrent.ExecutionException: scala.tools.reflect.ToolBoxError: reflective compilation has failed: cannot initialize the compiler

2020-12-28 Thread Avi Levi
I am trying to aggregate all records in a time window. This is my
ProcessAllWindowFunction :

case class SimpleAggregate(elms: List[String])

class AggregateLogs extends ProcessAllWindowFunction[String,
SimpleAggregate, TimeWindow ] {
  override def process(context: Context, elements: Iterable[String],
out: Collector[SimpleAggregate]): Unit = {
val es: List[String] = elements.toList
val record = SimpleAggregate(es)
out.collect(record)
  }
}

But I am getting this exception why ?

Exception in thread "main" java.util.concurrent.ExecutionException:
scala.tools.reflect.ToolBoxError: reflective compilation has failed: cannot
initialize the compiler due to java.lang.BootstrapMethodError:
java.lang.NoSuchMethodError:
scala.collection.immutable.List.$anonfun$flatMap$1$adapted(Lscala/runtime/BooleanRef;Lscala/runtime/ObjectRef;Lscala/runtime/ObjectRef;Ljava/lang/Object;)Ljava/lang/Object;
at
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
at
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)
at
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
at
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:137)
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2348)
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2320)
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer.(TraversableSerializer.scala:41)
at
com.neosec.handlefinancial.HandleFinancialJob$$anon$1$$anon$2$$anon$3.(HandleFinancialJob.scala:52)
at
com.neosec.handlefinancial.HandleFinancialJob$$anon$1$$anon$2.createSerializer(HandleFinancialJob.scala:52)
at
com.neosec.handlefinancial.HandleFinancialJob$$anon$1$$anon$2.createSerializer(HandleFinancialJob.scala:52)
at
com.neosec.handlefinancial.HandleFinancialJob$$anon$1.$anonfun$createSerializer$1(HandleFinancialJob.scala:52)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
at
com.neosec.handlefinancial.HandleFinancialJob$$anon$1.createSerializer(HandleFinancialJob.scala:52)
at
org.apache.flink.streaming.api.graph.StreamGraph.createSerializer(StreamGraph.java:864)
at
org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:308)
at
org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:293)
at
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:680)
at
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:253)
at
org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:212)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1863)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1848)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)
at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:699)
at
com.neosec.handlefinancial.HandleFinancialJob$.delayedEndpoint$com$neosec$handlefinancial$HandleFinancialJob$1(HandleFinancialJob.scala:60)
at
com.neosec.handlefinancial.HandleFinancialJob$delayedInit$body.apply(HandleFinancialJob.scala:20)
at scala.Function0.apply$mcV$sp(Function0.scala:39)
at scala.Function0.apply$mcV$sp$(Function0.scala:39)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
at scala.App.$anonfun$main$1$adapted(App.scala:80)
at scala.collection.immutable.List.foreach(List.scala:431)
at scala.App.main(App.scala:80)
at scala.App.main$(App.scala:78)
at
com.neosec.handlefinancial.HandleFinancialJob$.main(HandleFinancialJob.scala:20)
at
com.neosec.handlefinancial.HandleFinancialJob.main(HandleFinancialJob.scala)
Caused by: scala.tools.reflect.ToolBoxError: reflective compilation has
failed: cannot initialize the compiler due to
java.lang.BootstrapMethodError: java.lang.NoSuchMethodError:

Just published connect-flink-with-kinesis-kinesalite-using-scala

2020-12-23 Thread Avi Levi
Hi ,
After stumbling a little with connecting to kinesis/kinesalite I just
published connect-flink-with-kinesis-kinesalite-using-scala

hopefully it will assist someone.
would love to get your inputtes

Cheers
Avi


Re: Connecting to kinesis with mfa

2020-12-16 Thread Avi Levi
Awesome, thanks! looks good


On Wed, Dec 16, 2020 at 12:55 PM Cranmer, Danny  wrote:

> Hey Avi,
>
>
>
> I have reproduced and found a solution. The issue is not MFA, it is the
> BASIC credential provider is not using the token:
>
>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java#L181
>
>
>
> If you want to supply AK/SK/Token then you will have to use another
> CredentialProviderType, below is an example using SYS_PROP. We could
> improve the Kinesis connector to detect the session token and construct a
> BasicSessionCredentials:
>
>
> https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-core/src/main/java/com/amazonaws/auth/BasicSessionCredentials.java
>
>
>
> Properties systemProperties = System.*getProperties*();
> systemProperties.setProperty("aws.accessKeyId", accessKey);
> systemProperties.setProperty("aws.secretKey", secretKey);
> systemProperties.setProperty("aws.sessionToken", seesionToken);
>
> Properties producerConfig = new Properties();
> producerConfig.setProperty(AWSConfigConstants.*AWS_REGION*, *REGION*);
> producerConfig.setProperty(AWSConfigConstants.*AWS_CREDENTIALS_PROVIDER*,
> "SYS_PROP");
>
>
>
> I will add this to the Jira also. Let me know if you have any issues.
>
>
>
> Thanks,
>
> Danny
>
>
>
> *From: *Avi Levi 
> *Date: *Wednesday, 16 December 2020 at 08:09
> *To: *Robert Metzger 
> *Cc: *user 
> *Subject: *RE: [EXTERNAL] Connecting to kinesis with mfa
>
>
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> Thanks Robert, I actually tried all of the above but got to the same
> unfortunate result
>
>
>
> On Wed, Dec 16, 2020 at 8:24 AM Robert Metzger 
> wrote:
>
> Hey Avi,
>
>
>
> Maybe providing secret/access key + session token doesn't work, and you
> need to provide either one of them?
>
>
> https://docs.aws.amazon.com/credref/latest/refdocs/setting-global-aws_session_token.html
>
>
>
> I'll also ping some AWS contributors active in Flink to take a look at
> this.
>
>
>
> Best,
>
> Robert
>
>
>
> On Tue, Dec 15, 2020 at 10:07 AM Avi Levi  wrote:
>
> Hi guys,
>
> we are struggling to connect to kinesis when mfa is activated. I did
> configured everything according to the documentation but still getting
> exception :
>
>
> val producerConfig = new Properties()
>
> producerConfig.put(AWSConfigConstants.AWS_REGION, awsRegion)
>
> producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, awsAccessKey)
>
> producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
> awsSecretAccessKey)
>
> producerConfig.put(com.amazonaws.auth.profile.internal.ProfileKeyConstants.AWS_SESSION_TOKEN,
>  awsSessionToken)
>
> with a very simple pipeline :
>
>
>
> val producer = new FlinkKinesisProducer(new SimpleStringSchema(), 
> producerConfig)
>
> producer.setFailOnError(true)
>
> producer.setDefaultStream(outputStreamName)
>
> producer.setDefaultPartition("0")
>
> env.fromElements("a", "b", "c").addSink(producer)
>
> env.execute()
>
> the results with:
>
> 15:30:44,292 WARN 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
>  - [2020-12-14 15:30:44.292188] [0xcb5f][0x7512c000] [warning] 
> [AWS Log: WARN](AWSClient)If the signature check failed. This could be 
> because of a time skew. Attempting to adjust the signer.
>
> 15:30:44,378 INFO 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
>  - [2020-12-14 15:30:44.377865] [0xcb5b][0x782c1000] [info] 
> [shard_map.cc:87] Updating shard map for stream "ExampleOutputStream"
>
> 15:30:44,396 WARN 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
>  - [2020-12-14 15:30:44.396208] [0xcb55][0x72a3e000] [warning] 
> [AWS Log: WARN](AWSErrorMarshaller)Encountered AWSError 
> 'UnrecognizedClientException': The security token included in the request is 
> invalid.
>
> 15:30:44,396 ERROR 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
>  - [2020-12-14 15:30:44.396256] [0xcb55][0x72a3e000] [error] [AWS 
> Log: ERROR](AWSClient)HTTP response code: 400
>
> Exception name: UnrecognizedClientException
>
> Error message: The security token included in the request is invalid.
>
> 6 response headers:
>
> connection : close
>
> I double check that all keys are correct using the same keys that work
> perfectly when I execute commands from the cli. also when removing the mfa
> from kinesis the pipeline works as expected. finally i did open a ticket
> <https://issues.apache.org/jira/browse/FLINK-20602> for that also .
>
>


Re: Connecting to kinesis with mfa

2020-12-16 Thread Avi Levi
Thanks Robert, I actually tried all of the above but got to the same
unfortunate result

On Wed, Dec 16, 2020 at 8:24 AM Robert Metzger  wrote:

> Hey Avi,
>
> Maybe providing secret/access key + session token doesn't work, and you
> need to provide either one of them?
>
> https://docs.aws.amazon.com/credref/latest/refdocs/setting-global-aws_session_token.html
>
> I'll also ping some AWS contributors active in Flink to take a look at
> this.
>
> Best,
> Robert
>
> On Tue, Dec 15, 2020 at 10:07 AM Avi Levi  wrote:
>
>> Hi guys,
>> we are struggling to connect to kinesis when mfa is activated. I did
>> configured everything according to the documentation but still getting
>> exception :
>>
>>
>> val producerConfig = new Properties()
>> producerConfig.put(AWSConfigConstants.AWS_REGION, awsRegion)
>> producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, awsAccessKey)
>> producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
>> awsSecretAccessKey)
>> producerConfig.put(com.amazonaws.auth.profile.internal.ProfileKeyConstants.AWS_SESSION_TOKEN,
>>  awsSessionToken)
>>
>> with a very simple pipeline :
>>
>>
>>
>> val producer = new FlinkKinesisProducer(new SimpleStringSchema(), 
>> producerConfig)
>> producer.setFailOnError(true)
>> producer.setDefaultStream(outputStreamName)
>> producer.setDefaultPartition("0")
>> env.fromElements("a", "b", "c").addSink(producer)
>> env.execute()
>>
>> the results with:
>>
>> 15:30:44,292 WARN 
>> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
>>  - [2020-12-14 15:30:44.292188] [0xcb5f][0x7512c000] [warning] 
>> [AWS Log: WARN](AWSClient)If the signature check failed. This could be 
>> because of a time skew. Attempting to adjust the signer.
>> 15:30:44,378 INFO 
>> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
>>  - [2020-12-14 15:30:44.377865] [0xcb5b][0x782c1000] [info] 
>> [shard_map.cc:87] Updating shard map for stream "ExampleOutputStream"
>> 15:30:44,396 WARN 
>> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
>>  - [2020-12-14 15:30:44.396208] [0xcb55][0x72a3e000] [warning] 
>> [AWS Log: WARN](AWSErrorMarshaller)Encountered AWSError 
>> 'UnrecognizedClientException': The security token included in the request is 
>> invalid.
>> 15:30:44,396 ERROR 
>> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
>>  - [2020-12-14 15:30:44.396256] [0xcb55][0x72a3e000] [error] 
>> [AWS Log: ERROR](AWSClient)HTTP response code: 400
>> Exception name: UnrecognizedClientException
>> Error message: The security token included in the request is invalid.
>> 6 response headers:
>> connection : close
>>
>> I double check that all keys are correct using the same keys that work
>> perfectly when I execute commands from the cli. also when removing the mfa
>> from kinesis the pipeline works as expected. finally i did open a ticket
>> <https://issues.apache.org/jira/browse/FLINK-20602> for that also .
>>
>


Connecting to kinesis with mfa

2020-12-15 Thread Avi Levi
Hi guys,
we are struggling to connect to kinesis when mfa is activated. I did
configured everything according to the documentation but still getting
exception :


val producerConfig = new Properties()
producerConfig.put(AWSConfigConstants.AWS_REGION, awsRegion)
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, awsAccessKey)
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, awsSecretAccessKey)
producerConfig.put(com.amazonaws.auth.profile.internal.ProfileKeyConstants.AWS_SESSION_TOKEN,
awsSessionToken)

with a very simple pipeline :



val producer = new FlinkKinesisProducer(new SimpleStringSchema(),
producerConfig)
producer.setFailOnError(true)
producer.setDefaultStream(outputStreamName)
producer.setDefaultPartition("0")
env.fromElements("a", "b", "c").addSink(producer)
env.execute()

the results with:

15:30:44,292 WARN
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
- [2020-12-14 15:30:44.292188] [0xcb5f][0x7512c000]
[warning] [AWS Log: WARN](AWSClient)If the signature check failed.
This could be because of a time skew. Attempting to adjust the signer.
15:30:44,378 INFO
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
- [2020-12-14 15:30:44.377865] [0xcb5b][0x782c1000] [info]
[shard_map.cc:87] Updating shard map for stream "ExampleOutputStream"
15:30:44,396 WARN
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
- [2020-12-14 15:30:44.396208] [0xcb55][0x72a3e000]
[warning] [AWS Log: WARN](AWSErrorMarshaller)Encountered AWSError
'UnrecognizedClientException': The security token included in the
request is invalid.
15:30:44,396 ERROR
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
- [2020-12-14 15:30:44.396256] [0xcb55][0x72a3e000]
[error] [AWS Log: ERROR](AWSClient)HTTP response code: 400
Exception name: UnrecognizedClientException
Error message: The security token included in the request is invalid.
6 response headers:
connection : close

I double check that all keys are correct using the same keys that work
perfectly when I execute commands from the cli. also when removing the mfa
from kinesis the pipeline works as expected. finally i did open a ticket
 for that also .


Never terminating test ...

2020-12-13 Thread Avi Levi
I have the following test. the problem is it doesn't end ... meaning it
doesn't reach the assertion point. What am I doing wrong?

"kinesis consumer" should "consume message from kinesis stream" in {
import ExecutionContext.Implicits.global
val sampleData = Seq("a", "b", "c")
val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
env.addSource(new FlinkKinesisConsumer[String](
 "SampleStream", new SimpleStringSchema, consumerConfig))
.addSink(new TestSink[String])

Future(createSampleDataStream(sampleData)) //publish to kinesis stream
env.execute()
TestSink.values should contain theSameElementsAs (sampleData) //not executed
 }


Re: Putting record on kinesis (actually kinesalite) results with The security token included in the request is invalid.

2020-12-11 Thread Avi Levi
Hi,
it seems that the issue is with mfa involved .
I think that this is a flink issue because when sending commands (e.g put
record etc') from the cli (while mfa is activated of course), it works fine
(meaning credentials and security token works fine).
update:
after disabling the mfa the FlinkKinesisConsumer/Producer works fine, which
also kind of makes me think that this is a Flink issue.

Best
Avi

On Fri, Dec 11, 2020 at 2:03 PM Matthias Pohl 
wrote:

> True, I got this wrong. Do you have any reason to assume that it's a Flink
> issue? The configuration looks correct (relying on the Flink docs [1]
> here). Have you considered asking in the AWS community for help?
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#kinesis-producer
>
> On Thu, Dec 10, 2020 at 6:31 PM Avi Levi  wrote:
>
>> Hi,
>> Thanks for your reply, The problem is actually with the
>> FlinkKinesisProducer and not the consumer ( i did consume from the
>> stream successfully ). the keys are valid
>>
>> On Thu, Dec 10, 2020 at 6:53 PM Matthias Pohl 
>> wrote:
>>
>>> Hi Avi,
>>> thanks for reaching out to the Flink community. I haven't worked with
>>> the KinesisConsumer. Unfortenately, I cannot judge whether there's
>>> something missing in your setup. But first of all: Could you confirm that
>>> the key itself is valid? Did you try to use it in other cases?
>>>
>>> Best,
>>> Matthias
>>>
>>> On Thu, Dec 10, 2020 at 12:48 PM Avi Levi  wrote:
>>>
>>>> Hi ,
>>>> Any help here will be greatly appreciated I am about to throw the towel, 
>>>> very frustrating...
>>>> I am trying to put record on kinesalite with the following configuration :
>>>>
>>>> System.setProperty(com.amazonaws.SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY,
>>>>  "true")
>>>>   
>>>> System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY,
>>>>  "true")
>>>>   System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false")
>>>>   
>>>> System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY,"true")
>>>>
>>>>   val producerConfig = new Properties()
>>>>   producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
>>>>   producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "x")
>>>>   producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "x")
>>>>   producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, 
>>>> "http://localhost:4567;)
>>>>   producerConfig.put( "VerifyCertificate", "false")
>>>>
>>>> However putting a record on the stream :
>>>>
>>>>   val producer = new FlinkKinesisProducer(new SimpleStringSchema(), 
>>>> producerConfig)
>>>>   producer.setFailOnError(true)
>>>>   producer.setDefaultStream(outputStreamName)
>>>>   producer.setDefaultPartition("0")
>>>>
>>>>   val kinesis =
>>>> env.addSource(new FlinkKinesisConsumer[String](
>>>>   inputStreamName,new SimpleStringSchema, consumerConfig))
>>>>   .addSink(producer)
>>>>
>>>> yields:
>>>>
>>>> Exception name: UnrecognizedClientExceptionError message: The security 
>>>> token included in the request is invalid.6 response headers:
>>>> connection : close
>>>> content-length : 107
>>>> content-type : application/x-amz-json-1.1
>>>>
>>>>
>>>> ➜  ~ cat ~/.aws/credentials
>>>> [default]
>>>> aws_access_key_id = x
>>>> aws_secret_access_key = x
>>>> region = us-east-1
>>>>
>>>


Re: Putting record on kinesis (actually kinesalite) results with The security token included in the request is invalid.

2020-12-10 Thread Avi Levi
Hi,
Thanks for your reply, The problem is actually with the
FlinkKinesisProducer and not the consumer ( i did consume from the stream
successfully ). the keys are valid

On Thu, Dec 10, 2020 at 6:53 PM Matthias Pohl 
wrote:

> Hi Avi,
> thanks for reaching out to the Flink community. I haven't worked with the
> KinesisConsumer. Unfortenately, I cannot judge whether there's something
> missing in your setup. But first of all: Could you confirm that the key
> itself is valid? Did you try to use it in other cases?
>
> Best,
> Matthias
>
> On Thu, Dec 10, 2020 at 12:48 PM Avi Levi  wrote:
>
>> Hi ,
>> Any help here will be greatly appreciated I am about to throw the towel, 
>> very frustrating...
>> I am trying to put record on kinesalite with the following configuration :
>>
>> System.setProperty(com.amazonaws.SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY,
>>  "true")
>>   
>> System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, 
>> "true")
>>   System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false")
>>   
>> System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY,"true")
>>
>>   val producerConfig = new Properties()
>>   producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
>>   producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "x")
>>   producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "x")
>>   producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, 
>> "http://localhost:4567;)
>>   producerConfig.put( "VerifyCertificate", "false")
>>
>> However putting a record on the stream :
>>
>>   val producer = new FlinkKinesisProducer(new SimpleStringSchema(), 
>> producerConfig)
>>   producer.setFailOnError(true)
>>   producer.setDefaultStream(outputStreamName)
>>   producer.setDefaultPartition("0")
>>
>>   val kinesis =
>> env.addSource(new FlinkKinesisConsumer[String](
>>   inputStreamName,new SimpleStringSchema, consumerConfig))
>>   .addSink(producer)
>>
>> yields:
>>
>> Exception name: UnrecognizedClientExceptionError message: The security token 
>> included in the request is invalid.6 response headers:
>> connection : close
>> content-length : 107
>> content-type : application/x-amz-json-1.1
>>
>>
>> ➜  ~ cat ~/.aws/credentials
>> [default]
>> aws_access_key_id = x
>> aws_secret_access_key = x
>> region = us-east-1
>>
>


Putting record on kinesis (actually kinesalite) results with The security token included in the request is invalid.

2020-12-10 Thread Avi Levi
Hi , 
Any help here will be greatly appreciated I am about to throw the towel, very 
frustrating...
I am trying to put record on kinesalite with the following configuration :

System.setProperty(com.amazonaws.SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY,
 "true")
  System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, 
"true")
  System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false")
  
System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY,"true")
  
  val producerConfig = new Properties()
  producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
  producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "x")
  producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "x")
  producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567;)
  producerConfig.put( "VerifyCertificate", "false")
However putting a record on the stream :
  val producer = new FlinkKinesisProducer(new SimpleStringSchema(), 
producerConfig)
  producer.setFailOnError(true)
  producer.setDefaultStream(outputStreamName)
  producer.setDefaultPartition("0")

  val kinesis =
env.addSource(new FlinkKinesisConsumer[String](
  inputStreamName,new SimpleStringSchema, consumerConfig))
  .addSink(producer)
yields:
Exception name: UnrecognizedClientException
Error message: The security token included in the request is invalid.
6 response headers:
connection : close
content-length : 107
content-type : application/x-amz-json-1.1

➜  ~ cat ~/.aws/credentials
[default]
aws_access_key_id = x
aws_secret_access_key = x
region = us-east-1

Re: testing - syncing data timeline

2019-12-25 Thread Avi Levi
not sure that I can see how it is simpler. #2 is time window per day it
emits the highest hour for that day. #4 is not a time window it keeps
history of several days . if I want to put the logic of #2 I will need to
manage it with timers, correct ?

On Thu, Dec 26, 2019 at 6:28 AM Kurt Young  wrote:

> *This Message originated outside your organization.*
> --
> Hi,
>
> You can merge the logic of #2 into #4, it will be much simpler.
>
> Best,
> Kurt
>
>
> On Wed, Dec 25, 2019 at 7:36 PM Avi Levi  wrote:
>
>>  Hi ,
>>
>> I have the following pipeline :
>> 1. single hour window that counts the number of records
>> 2. single day window that accepts the aggregated data from #1 and emits
>> the highest hour count of that day
>> 3. union #1 + #2
>> 4. Logic operator that accepts the data from #3 and keep a listState of
>> #2 and apply some logic on #1 based on that state (e.g comparing a single
>> hour the history of the max hours at the last X days ) and emits the result
>>
>> the timestamsAndWaterMarks is
>> using BoundedOutOfOrdernessTimestampExtractor (event-time)  and I allow
>> lateness of 3 hours
>>
>>  the problem is that when I try to do unit tests of all the pipeline, the
>> data from #1 rich #4 before the latter accepts the data from #3 hence it
>> doesn't have any state yet (state is always empty when the stream from #1
>> arrives ).
>> My source in the tests is a collection that represents the records.
>>  is there anyway I can solve this ?
>> [image: Screen Shot 2019-12-25 at 13.04.17.png]
>> I appreciate any help you can provide
>> Cheers
>> Avi
>>
>>
>>


testing - syncing data timeline

2019-12-25 Thread Avi Levi
 Hi ,

I have the following pipeline :
1. single hour window that counts the number of records
2. single day window that accepts the aggregated data from #1 and emits the
highest hour count of that day
3. union #1 + #2
4. Logic operator that accepts the data from #3 and keep a listState of #2
and apply some logic on #1 based on that state (e.g comparing a single hour
the history of the max hours at the last X days ) and emits the result

the timestamsAndWaterMarks is using BoundedOutOfOrdernessTimestampExtractor
(event-time)  and I allow lateness of 3 hours

 the problem is that when I try to do unit tests of all the pipeline, the
data from #1 rich #4 before the latter accepts the data from #3 hence it
doesn't have any state yet (state is always empty when the stream from #1
arrives ).
My source in the tests is a collection that represents the records.
 is there anyway I can solve this ?
[image: Screen Shot 2019-12-25 at 13.04.17.png]
I appreciate any help you can provide
Cheers
Avi


Re: Firing timers on ProcessWindowFunction

2019-12-02 Thread Avi Levi
Thanks Alexander,
Will do.

Cheers

On Mon, Dec 2, 2019 at 3:23 PM Alexander Fedulov 
wrote:

> *This Message originated outside your organization.*
> --
> Hi Avi,
>
> In this situation I would propose to step back and use a lower level API
> -  ProcessFunction. You can put your window elements into the Flink-managed
> List state and handle clean-up/triggering and periodic state mutations
> exactly as needed by implementing some additional timers logic.
>
> Best regards,
>
> --
>
> Alexander Fedulov | Solutions Architect
>
> +49 1514 6265796
>
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward
> <https://flink-forward.org/>
> - The Apache Flink Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Tony) Cheng
>
>
>
> On Mon, Dec 2, 2019 at 1:16 PM Avi Levi  wrote:
>
>> I think the only way to do this is to add keyed operator down the stream
>> that will hold the global state. not ideal but I don't see any other option
>>
>> On Mon, Dec 2, 2019 at 1:43 PM Avi Levi  wrote:
>>
>>> Hi Vino,
>>> I have a global state that I need to mutate every X hours (e.g clean
>>> that state or update its value) . I thought that there might be an option
>>> to set a timer user the timerService with it's own time interval detached
>>> from the window interval interval .
>>>
>>> On Mon, Dec 2, 2019 at 10:59 AM vino yang  wrote:
>>>
>>>> *This Message originated outside your organization.*
>>>> --
>>>> Hi Avi,
>>>>
>>>> Firstly, let's clarify that the "timer" you said is the timer of the
>>>> window? Or a timer you want to register to trigger some action?
>>>>
>>>> Best,
>>>> Vino
>>>>
>>>>
>>>> Avi Levi  于2019年12月2日周一 下午4:11写道:
>>>>
>>>>> Hi,
>>>>> Is there a way to fire timer in a ProcessWindowFunction ? I would like
>>>>> to mutate the global state on a timely basis.
>>>>>
>>>>>


Re: Firing timers on ProcessWindowFunction

2019-12-02 Thread Avi Levi
I think the only way to do this is to add keyed operator down the stream
that will hold the global state. not ideal but I don't see any other option

On Mon, Dec 2, 2019 at 1:43 PM Avi Levi  wrote:

> Hi Vino,
> I have a global state that I need to mutate every X hours (e.g clean that
> state or update its value) . I thought that there might be an option to set
> a timer user the timerService with it's own time interval detached from the
> window interval interval .
>
> On Mon, Dec 2, 2019 at 10:59 AM vino yang  wrote:
>
>> *This Message originated outside your organization.*
>> --
>> Hi Avi,
>>
>> Firstly, let's clarify that the "timer" you said is the timer of the
>> window? Or a timer you want to register to trigger some action?
>>
>> Best,
>> Vino
>>
>>
>> Avi Levi  于2019年12月2日周一 下午4:11写道:
>>
>>> Hi,
>>> Is there a way to fire timer in a ProcessWindowFunction ? I would like
>>> to mutate the global state on a timely basis.
>>>
>>>


Re: Firing timers on ProcessWindowFunction

2019-12-02 Thread Avi Levi
Hi Vino,
I have a global state that I need to mutate every X hours (e.g clean that
state or update its value) . I thought that there might be an option to set
a timer user the timerService with it's own time interval detached from the
window interval interval .

On Mon, Dec 2, 2019 at 10:59 AM vino yang  wrote:

> *This Message originated outside your organization.*
> --
> Hi Avi,
>
> Firstly, let's clarify that the "timer" you said is the timer of the
> window? Or a timer you want to register to trigger some action?
>
> Best,
> Vino
>
>
> Avi Levi  于2019年12月2日周一 下午4:11写道:
>
>> Hi,
>> Is there a way to fire timer in a ProcessWindowFunction ? I would like to
>> mutate the global state on a timely basis.
>>
>>


Firing timers on ProcessWindowFunction

2019-12-02 Thread Avi Levi
Hi,
Is there a way to fire timer in a ProcessWindowFunction ? I would like to
mutate the global state on a timely basis.


Re: Idiomatic way to split pipeline

2019-12-01 Thread Avi Levi
Thanks Arvid,
The problem is that I will get an exception on non unique uid on the
*stream* .

On Thu, Nov 28, 2019 at 2:45 PM Arvid Heise  wrote:

> *This Message originated outside your organization.*
> --
> Hi Avi,
>
> it seems to me that you are not really needing any split feature. As far
> as I can see in your picture you want to apply two different windows on the
> same input data.
>
> In that case you simply use two different subgraphs.
>
> stream = ...
>
> stream1 = stream.window(...).addSink()
>
> stream2 = stream.window(...).addSink()
>
> In Flink, you can compose arbitrary directed acyclic graphs, so consuming
> the output of one operator on several downstream operators is completely
> normal.
>
> Best,
>
> Arvid
>
> On Mon, Nov 25, 2019 at 10:50 AM Avi Levi  wrote:
>
>> Thanks, I'll check it out.
>>
>> On Mon, Nov 25, 2019 at 11:46 AM vino yang  wrote:
>>
>>> *This Message originated outside your organization.*
>>> --
>>> Hi Avi,
>>>
>>> The side output provides a superset of split's functionality. So
>>> anything can be implemented via split also can be implemented via side
>>> output.[1]
>>>
>>> Best,
>>> Vino
>>>
>>> [1]:
>>> https://stackoverflow.com/questions/51440677/apache-flink-whats-the-difference-between-side-outputs-and-split-in-the-data
>>> <https://stackoverflow.com/questions/51440677/apache-flink-whats-the-difference-between-side-outputs-and-split-in-the-data>
>>>
>>> Avi Levi  于2019年11月25日周一 下午5:32写道:
>>>
>>>> Thank you, for your quick reply. I appreciate that.  but this it not
>>>> exactly "side output" per se. it is simple splitting. IIUC The side output
>>>> is more for splitting the records buy something the differentiate them
>>>> (latnes , value etc' ) . I thought there is more idiomatic but if this is
>>>> it, than I will go with that.
>>>>
>>>> On Mon, Nov 25, 2019 at 10:42 AM vino yang 
>>>> wrote:
>>>>
>>>>> *This Message originated outside your organization.*
>>>>> --
>>>>> Hi Avi,
>>>>>
>>>>> As the doc of DataStream#split said, you can use the "side output"
>>>>> feature to replace it.[1]
>>>>>
>>>>> [1]:
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>>>>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html>
>>>>>
>>>>> Best,
>>>>> Vino
>>>>>
>>>>> Avi Levi  于2019年11月25日周一 下午4:12写道:
>>>>>
>>>>>> Hi,
>>>>>> I want to split the output of one of the operators to two pipelines.
>>>>>> Since the *split* method is deprecated, what is the idiomatic way to
>>>>>> do that without duplicating the operator ?
>>>>>>
>>>>>> [image: Screen Shot 2019-11-25 at 10.05.38.png]
>>>>>>
>>>>>>
>>>>>>


Re: Idiomatic way to split pipeline

2019-11-25 Thread Avi Levi
Thanks, I'll check it out.

On Mon, Nov 25, 2019 at 11:46 AM vino yang  wrote:

> *This Message originated outside your organization.*
> --
> Hi Avi,
>
> The side output provides a superset of split's functionality. So anything
> can be implemented via split also can be implemented via side output.[1]
>
> Best,
> Vino
>
> [1]:
> https://stackoverflow.com/questions/51440677/apache-flink-whats-the-difference-between-side-outputs-and-split-in-the-data
> <https://stackoverflow.com/questions/51440677/apache-flink-whats-the-difference-between-side-outputs-and-split-in-the-data>
>
> Avi Levi  于2019年11月25日周一 下午5:32写道:
>
>> Thank you, for your quick reply. I appreciate that.  but this it not
>> exactly "side output" per se. it is simple splitting. IIUC The side output
>> is more for splitting the records buy something the differentiate them
>> (latnes , value etc' ) . I thought there is more idiomatic but if this is
>> it, than I will go with that.
>>
>> On Mon, Nov 25, 2019 at 10:42 AM vino yang  wrote:
>>
>>> *This Message originated outside your organization.*
>>> --
>>> Hi Avi,
>>>
>>> As the doc of DataStream#split said, you can use the "side output"
>>> feature to replace it.[1]
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html>
>>>
>>> Best,
>>> Vino
>>>
>>> Avi Levi  于2019年11月25日周一 下午4:12写道:
>>>
>>>> Hi,
>>>> I want to split the output of one of the operators to two pipelines.
>>>> Since the *split* method is deprecated, what is the idiomatic way to
>>>> do that without duplicating the operator ?
>>>>
>>>> [image: Screen Shot 2019-11-25 at 10.05.38.png]
>>>>
>>>>
>>>>


Re: Idiomatic way to split pipeline

2019-11-25 Thread Avi Levi
Thank you, for your quick reply. I appreciate that.  but this it not
exactly "side output" per se. it is simple splitting. IIUC The side output
is more for splitting the records buy something the differentiate them
(latnes , value etc' ) . I thought there is more idiomatic but if this is
it, than I will go with that.

On Mon, Nov 25, 2019 at 10:42 AM vino yang  wrote:

> *This Message originated outside your organization.*
> --
> Hi Avi,
>
> As the doc of DataStream#split said, you can use the "side output" feature
> to replace it.[1]
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html>
>
> Best,
> Vino
>
> Avi Levi  于2019年11月25日周一 下午4:12写道:
>
>> Hi,
>> I want to split the output of one of the operators to two pipelines.
>> Since the *split* method is deprecated, what is the idiomatic way to do
>> that without duplicating the operator ?
>>
>> [image: Screen Shot 2019-11-25 at 10.05.38.png]
>>
>>
>>


Idiomatic way to split pipeline

2019-11-25 Thread Avi Levi
Hi,
I want to split the output of one of the operators to two pipelines. Since
the *split* method is deprecated, what is the idiomatic way to do that
without duplicating the operator ?

[image: Screen Shot 2019-11-25 at 10.05.38.png]


need some advice comparing sliding window to a single unit

2019-11-06 Thread Avi Levi
Hi,
I want to get the average of the last x hours and compare it to the sum of
the current hour.
I thought of using ProcessWindowFunction for 8 hours and do the calculation
i.e consuming 8 hours of data and group it by the hour and do the math, but
it seems very inefficient especially considering that we are dealing with
heavy load.
is there any other more elegant solution ?

Cheers
Avi


Re: getting an exception

2019-08-06 Thread Avi Levi
Yeap that was it (deploying 1.8.1 over 1.8.0 ) thanks !!!

On Mon, Aug 5, 2019 at 5:53 PM Gaël Renoux  wrote:

> *This Message originated outside your organization.*
> --
> Hi Avi and Victor,
>
> I just opened this ticket on JIRA:
> https://issues.apache.org/jira/browse/FLINK-13586
> <https://issues.apache.org/jira/browse/FLINK-13586>
> (I hadn't seen these e-mails). Backward compatibility is broken between
> 1.8.0 and 1.8.1 if you use Kafka connectors.
>
> Can you upgrade your flink-connector-kafka dependency to 1.8.1 ? It won't
> deploy on a 1.8.0 server any more, if that's a concern for you.
>
> Gaël
>
> On Mon, Aug 5, 2019 at 4:37 PM Wong Victor 
> wrote:
>
>> Hi Avi:
>>
>>
>>
>> It seems you are submitting your job with an older Flink version (< 1.8),
>> please check your flink-dist version.
>>
>>
>>
>> Regards,
>>
>> Victor
>>
>>
>>
>> *From: *Avi Levi 
>> *Date: *Monday, August 5, 2019 at 9:11 PM
>> *To: *user 
>> *Subject: *getting an exception
>>
>>
>>
>> Hi,
>>
>> I'm using Flink 1.8.1. our code is mostly using Scala.
>>
>> When I try to submit my job (on my local machine ) it crashes with the
>> error below (BTW on the IDE it runs perfectly).
>>
>> Any assistance would be appreciated.
>>
>> Thanks
>>
>> Avi
>>
>> 2019-08-05 12:58:03.783 [Flink-DispatcherRestEndpoint-thread-3] ERROR 
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler  - Unhandled 
>> exception.
>>
>> org.apache.flink.client.program.ProgramInvocationException: The program 
>> caused an error:
>>
>> at 
>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)
>>
>> at 
>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
>>
>> at 
>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
>>
>> at 
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
>>
>> at 
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>
>> at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>
>> at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>
>> at java.lang.Thread.run(Thread.java:748)
>>
>> Caused by: java.lang.NoSuchMethodError: 
>> org.apache.flink.api.java.ClosureCleaner.clean(Ljava/lang/Object;Lorg/apache/flink/api/common/ExecutionConfig$ClosureCleanerLevel;Z)V
>>
>> at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.(http://FlinkKafkaProducer011.java:494
>>  <http://FlinkKafkaProducer011.java:494>)
>>
>> at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.(http://FlinkKafkaProducer011.java:448
>>  <http://FlinkKafkaProducer011.java:448>)
>>
>> at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.(http://FlinkKafkaProducer011.java:383
>>  <http://FlinkKafkaProducer011.java:383>)
>>
>> at 
>> com.bluevoyant.commons.queueHandlers.KeyedKafkaProducerImpl.producer(KafkaImpl.scala:18)
>>
>> at 
>> com.bluevoyant.commons.queueHandlers.KeyedKafkaProducerImpl.producer$(KafkaImpl.scala:18)
>>
>> at 
>> com.bluevoyant.lookalike.analytic.queueHandlers.QueueHandlerImpl$.producer(QueueHandlerImpl.scala:13)
>>
>> at 
>> com.bluevoyant.lookalike.analytic.StreamingJob$.delayedEndpoint$com$bluevoyant$lookalike$analytic$StreamingJob$1(StreamingJob.scala:42)
>>
>> at 
>> com.bluevoyant.lookalike.analytic.StreamingJob$delayedInit$body.apply(StreamingJob.scala:14)
>>
>> at scala.Function0.apply$mcV$sp(Function0.scala:34)
>>
>> at scala.Function0.apply$mcV$sp$(Function0.scala:34)
>>
>> at 
>> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>>
>> at scala.App.$anonfun$main$1$adapted(App.scala:76)
>>
>> at scala.collection.immutable.List.foreach(List.scala:388)
>>
>> at scala.App.main(App.scala:76)
>>
>> at scala.App.main$(App.scala:74)
>>
>> at 
>> co

getting an exception

2019-08-05 Thread Avi Levi
Hi,
I'm using Flink 1.8.1. our code is mostly using Scala.
When I try to submit my job (on my local machine ) it crashes with the
error below (BTW on the IDE it runs perfectly).
Any assistance would be appreciated.
Thanks
Avi

2019-08-05 12:58:03.783 [Flink-DispatcherRestEndpoint-thread-3] ERROR
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler  -
Unhandled exception.
org.apache.flink.client.program.ProgramInvocationException: The
program caused an error:
at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodError:
org.apache.flink.api.java.ClosureCleaner.clean(Ljava/lang/Object;Lorg/apache/flink/api/common/ExecutionConfig$ClosureCleanerLevel;Z)V
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.(FlinkKafkaProducer011.java:494)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.(FlinkKafkaProducer011.java:448)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.(FlinkKafkaProducer011.java:383)
at 
com.bluevoyant.commons.queueHandlers.KeyedKafkaProducerImpl.producer(KafkaImpl.scala:18)
at 
com.bluevoyant.commons.queueHandlers.KeyedKafkaProducerImpl.producer$(KafkaImpl.scala:18)
at 
com.bluevoyant.lookalike.analytic.queueHandlers.QueueHandlerImpl$.producer(QueueHandlerImpl.scala:13)
at 
com.bluevoyant.lookalike.analytic.StreamingJob$.delayedEndpoint$com$bluevoyant$lookalike$analytic$StreamingJob$1(StreamingJob.scala:42)
at 
com.bluevoyant.lookalike.analytic.StreamingJob$delayedInit$body.apply(StreamingJob.scala:14)
at scala.Function0.apply$mcV$sp(Function0.scala:34)
at scala.Function0.apply$mcV$sp$(Function0.scala:34)
at 
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App.$anonfun$main$1$adapted(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:388)
at scala.App.main(App.scala:76)
at scala.App.main$(App.scala:74)
at 
com.bluevoyant.lookalike.analytic.StreamingJob$.main(StreamingJob.scala:14)
at 
com.bluevoyant.lookalike.analytic.StreamingJob.main(StreamingJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
... 7 common frames omitted


Re: State incompatible

2019-07-15 Thread Avi Levi
Thanks Haibo,
bummer ;)

On Mon, Jul 15, 2019 at 12:27 PM Haibo Sun  wrote:

> *This Message originated outside your organization.*
> --
> Hi,  Avi Levi
>
> I don't think there's any way to solve this problem right now, and Flink
> documentation clearly shows that this is not supported.
>
> “Trying to restore state, which was previously configured without TTL,
> using TTL enabled descriptor or vice versa will lead to compatibility
> failure and StateMigrationException."
>
> Flink Document:
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html#state-time-to-live-ttl
> <https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html#state-time-to-live-ttl>
>
> Best,
> Haibo
>
> At 2019-07-14 16:50:19, "Avi Levi"  wrote:
>
> Hi,
> I added a ttl to my state
> *old version :*
>  private lazy val stateDescriptor = new ValueStateDescriptor("foo",
> Types.CASE_CLASS[DomainState])
>
> *vs the new version *
>
> @transient
> private lazy val storeTtl = StateTtlConfig.newBuilder(90)
>   .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>   .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>   .cleanupInRocksdbCompactFilter()
>   .build()
>
>   private lazy val stateDescriptor = {
> val des = new ValueStateDescriptor("foo",
> Types.CASE_CLASS[DomainState])
> des.enableTimeToLive(storeTtl)
> des
>   }
>
> *BUT when trying to restore from savepoint I am getting this error:*
>
> java.lang.RuntimeException: Error while getting state
>   at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
>   at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
>   ...
>
>
> Caused by: org.apache.flink.util.StateMigrationException: The new state 
> serializer cannot be incompatible.
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:527)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
>   at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
>   at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:137)
>   at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
>   at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
>   at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
>   at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335)
>   at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
>   at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
>   ... 11 more
>
>
> Do you have any idea how can I resolve it ?
>
>
> Best wishes
>
>


State incompatible

2019-07-14 Thread Avi Levi
Hi,
I added a ttl to my state
*old version :*
 private lazy val stateDescriptor = new ValueStateDescriptor("foo",
Types.CASE_CLASS[DomainState])

*vs the new version *

@transient
private lazy val storeTtl = StateTtlConfig.newBuilder(90)
  .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  .cleanupInRocksdbCompactFilter()
  .build()

  private lazy val stateDescriptor = {
val des = new ValueStateDescriptor("foo", Types.CASE_CLASS[DomainState])
des.enableTimeToLive(storeTtl)
des
  }

*BUT when trying to restore from savepoint I am getting this error:*

java.lang.RuntimeException: Error while getting state
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
...


Caused by: org.apache.flink.util.StateMigrationException: The new
state serializer cannot be incompatible.
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:527)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:137)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335)
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 11 more


Do you have any idea how can I resolve it ?


Best wishes


Re: Queryable state and TTL

2019-07-06 Thread Avi Levi
Thanks, I'll check it out.

On Sun, Jul 7, 2019 at 5:40 AM Eron Wright  wrote:

> *This Message originated outside your organization.*
> --
> Here's a PR for queryable state TLS that I closed because I didn't have
> time, and because I get the impression that the queryable state feature is
> used very often.Feel free to take it up, if you like.
> https://github.com/apache/flink/pull/6626
> <https://github.com/apache/flink/pull/6626>
>
> -Eron
>
> On Wed, Jul 3, 2019 at 11:21 PM Avi Levi  wrote:
>
>> Hi Yu,
>> Our sink is actually Kafka hence we cannot query it properly, from there
>> we distribute it to different consumers. We keep info in our state such as
>> entry time, some accumulated data etc' , this data is not kept elsewhere
>> hence we need to query our state.
>>
>> Best regards
>> Avi
>>
>>
>> On Thu, Jul 4, 2019 at 7:20 AM Yu Li  wrote:
>>
>>> *This Message originated outside your organization.*
>>> --
>>> Thanks for the ping Andrey.
>>>
>>> For me the general answer is yes, but TBH it will probably not be added
>>> in the foreseeable future due to lack of committer bandwidth (not only
>>> QueryableState with TTL but all about QueryableState module) as per
>>> Aljoscha pointed out in another thread [1].
>>>
>>> Although we could see emerging requirements and proposals on
>>> QueryableState recently, prioritizing is important for each open source
>>> project. And personally I think it may help if we could gather more and
>>> clearly describe the other-than-debugging use cases of QueryableState in
>>> production [2]. Could you share your case with us and why QueryableState is
>>> necessary rather than querying the data from sink @Avi? Thanks.
>>>
>>> [1] https://s.apache.org/MaOl
>>> <https://s.apache.org/MaOl>
>>> [2] https://s.apache.org/hJDA
>>> <https://s.apache.org/hJDA>
>>>
>>> Best Regards,
>>> Yu
>>>
>>>
>>> On Wed, 3 Jul 2019 at 23:13, Andrey Zagrebin 
>>> wrote:
>>>
>>>> Hi Avi,
>>>>
>>>> It is on the road map but I am not aware about plans of any contributor
>>>> to work on it for the next releases.
>>>> I think the community will firstly work on the event time support for
>>>> TTL.
>>>> I will loop Yu in, maybe he has some plans to work on TTL for the
>>>> queryable state.
>>>>
>>>> Best,
>>>> Andrey
>>>>
>>>> On Wed, Jul 3, 2019 at 3:17 PM Avi Levi 
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> Adding queryable state to state with ttl is not supported at 1.8.0
>>>>> (throwing java.lang.IllegalArgumentException: Queryable state is currently
>>>>> not supported with TTL)
>>>>>
>>>>> I saw in previous mailing thread
>>>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201808.mbox/%3c300f09f0-053e-43ba-a993-1259816ad...@data-artisans.com%3E>that
>>>>> it is on the roadmap. Is it still on the roadmap ?
>>>>>
>>>>> * There is a workaround which is using timers to clear the state, but
>>>>> in our case, it means firing billons of timers on daily basis all at the
>>>>> same time, which seems no to very efficient and might cause some resources
>>>>> issues
>>>>>
>>>>> Cheers
>>>>> Avi
>>>>>
>>>>>
>>>>>


Re: Queryable state and TTL

2019-07-04 Thread Avi Levi
Hi Yu,
Our sink is actually Kafka hence we cannot query it properly, from there we
distribute it to different consumers. We keep info in our state such as
entry time, some accumulated data etc' , this data is not kept elsewhere
hence we need to query our state.

Best regards
Avi


On Thu, Jul 4, 2019 at 7:20 AM Yu Li  wrote:

> *This Message originated outside your organization.*
> --
> Thanks for the ping Andrey.
>
> For me the general answer is yes, but TBH it will probably not be added in
> the foreseeable future due to lack of committer bandwidth (not only
> QueryableState with TTL but all about QueryableState module) as per
> Aljoscha pointed out in another thread [1].
>
> Although we could see emerging requirements and proposals on
> QueryableState recently, prioritizing is important for each open source
> project. And personally I think it may help if we could gather more and
> clearly describe the other-than-debugging use cases of QueryableState in
> production [2]. Could you share your case with us and why QueryableState is
> necessary rather than querying the data from sink @Avi? Thanks.
>
> [1] https://s.apache.org/MaOl
> <https://s.apache.org/MaOl>
> [2] https://s.apache.org/hJDA
> <https://s.apache.org/hJDA>
>
> Best Regards,
> Yu
>
>
> On Wed, 3 Jul 2019 at 23:13, Andrey Zagrebin  wrote:
>
>> Hi Avi,
>>
>> It is on the road map but I am not aware about plans of any contributor
>> to work on it for the next releases.
>> I think the community will firstly work on the event time support for TTL.
>> I will loop Yu in, maybe he has some plans to work on TTL for the
>> queryable state.
>>
>> Best,
>> Andrey
>>
>> On Wed, Jul 3, 2019 at 3:17 PM Avi Levi  wrote:
>>
>>> Hi,
>>> Adding queryable state to state with ttl is not supported at 1.8.0
>>> (throwing java.lang.IllegalArgumentException: Queryable state is currently
>>> not supported with TTL)
>>>
>>> I saw in previous mailing thread
>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201808.mbox/%3c300f09f0-053e-43ba-a993-1259816ad...@data-artisans.com%3E>that
>>> it is on the roadmap. Is it still on the roadmap ?
>>>
>>> * There is a workaround which is using timers to clear the state, but in
>>> our case, it means firing billons of timers on daily basis all at the same
>>> time, which seems no to very efficient and might cause some resources
>>> issues
>>>
>>> Cheers
>>> Avi
>>>
>>>
>>>


Queryable state and TTL

2019-07-03 Thread Avi Levi
Hi,
Adding queryable state to state with ttl is not supported at 1.8.0
(throwing java.lang.IllegalArgumentException: Queryable state is currently
not supported with TTL)

I saw in previous mailing thread
that
it is on the roadmap. Is it still on the roadmap ?

* There is a workaround which is using timers to clear the state, but in
our case, it means firing billons of timers on daily basis all at the same
time, which seems no to very efficient and might cause some resources
issues

Cheers
Avi


Re: Connection refused while trying to query state

2019-07-02 Thread Avi Levi
No, it doesn't. Thanks for pointing it. I just noticed that I wasn't using
the proxy server address.
Thanks !!!


On Tue, Jul 2, 2019 at 12:16 PM Kostas Kloudas  wrote:

> *This Message originated outside your organization.*
> --
> Hi Avi,
>
> Do you point the client to the correct address? This means where the
> "Queryable State Proxy Server @ ..." says?
>
> Cheers,
> Kostas
>
> On Sun, Jun 30, 2019 at 4:37 PM Avi Levi  wrote:
>
>> Hi,
>> I am trying to query state (cluster 1.8.0 is running on my local machine)
>> .
>> I do see in the logs "Started the Queryable State Proxy Server @ ...".
>>
>> but when I am trying to query the state from the client ,
>> val descriptor = new ValueStateDescriptor("queryable-state",
>> Types.CASE_CLASS[State])
>> client.getKvState(jobId, "seen-domains",key,
>> BasicTypeInfo.STRING_TYPE_INFO, descriptor)
>> I am getting the following exception :
>>
>> [ERROR] [06/30/2019 16:59:48.850]
>> [bvAkkaHttpServer-akka.actor.default-dispatcher-10]
>> [akka.actor.ActorSystemImpl(AkkaHttpServer)] Error during processing of
>> request:
>> 'org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
>> Connection refused: /127.0.0.1:9069
>> <http://127.0.0.1:9069>'.
>> Completing with 500 Internal Server Error response. To change default
>> exception handling behavior, provide a custom ExceptionHandler.
>> java.util.concurrent.CompletionException:
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
>> Connection refused: /127.0.0.1:9069
>> <http://127.0.0.1:9069>
>> at
>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>> at
>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>> at
>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
>> at
>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>> at
>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>> at
>> org.apache.flink.queryablestate.network.Client$PendingConnection.close(Client.java:377)
>> at
>> org.apache.flink.queryablestate.network.Client$PendingConnection.operationComplete(Client.java:270)
>> at
>> org.apache.flink.queryablestate.network.Client$PendingConnection.operationComplete(Client.java:231)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by:
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
>> Connection refused: /127.0.0.1:9069
>> <http://127.0.0.1:9069>
>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
>> ... 6 more
>> Caused by: java.net.ConnectException: Connection refused
>> ... 10 more
>>
>>


Setting consumer offset

2019-07-02 Thread Avi Levi
Hi,
If I set in code the consumer offset e.g *consumer.setStartFromTimestamp*
and I start the job from a curtain savepoint/checkpoint will the offset in
the checkpoint will override the the offset that is defined in the code  ?

Best Regards
Avi


Connection refused while trying to query state

2019-06-30 Thread Avi Levi
Hi,
I am trying to query state (cluster 1.8.0 is running on my local machine) .
I do see in the logs "Started the Queryable State Proxy Server @ ...".

but when I am trying to query the state from the client ,
val descriptor = new ValueStateDescriptor("queryable-state",
Types.CASE_CLASS[State])
client.getKvState(jobId, "seen-domains",key,
BasicTypeInfo.STRING_TYPE_INFO, descriptor)
I am getting the following exception :

[ERROR] [06/30/2019 16:59:48.850]
[bvAkkaHttpServer-akka.actor.default-dispatcher-10]
[akka.actor.ActorSystemImpl(AkkaHttpServer)] Error during processing of
request:
'org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
Connection refused: /127.0.0.1:9069'. Completing with 500 Internal Server
Error response. To change default exception handling behavior, provide a
custom ExceptionHandler.
java.util.concurrent.CompletionException:
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
Connection refused: /127.0.0.1:9069
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.queryablestate.network.Client$PendingConnection.close(Client.java:377)
at
org.apache.flink.queryablestate.network.Client$PendingConnection.operationComplete(Client.java:270)
at
org.apache.flink.queryablestate.network.Client$PendingConnection.operationComplete(Client.java:231)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
at java.lang.Thread.run(Thread.java:748)
Caused by:
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
Connection refused: /127.0.0.1:9069
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
... 6 more
Caused by: java.net.ConnectException: Connection refused
... 10 more


Re: Getting async function call terminated with an exception

2019-05-12 Thread Avi Levi
Thank you! that did it !

On Wed, May 8, 2019 at 5:42 PM Till Rohrmann  wrote:

> *This Message originated outside your organization.*
> --
> Hi Avi,
>
> you need to complete the given resultFuture and not return a future. You
> can do this via resultFuture.complete(r).
>
> Cheers,
> Till
>
> On Tue, May 7, 2019 at 8:30 PM Avi Levi  wrote:
>
>> Hi,
>> We are using flink 1.8.0 (but the flowing also happens in 1.7.2) I tried
>> very simple unordered async call
>> override def asyncInvoke(input: Foo, resultFuture:
>> ResultFuture[ScoredFoo]) : Unit  = {
>>val r = ScoredFoo(Foo("a"), 80)
>>Future.successful(r)
>>}
>>
>> Running this stream seem to be stuck in some infinite loop until it
>> crashes on timeout exception.:
>>
>> *java.lang.Exception: An async function call terminated with an
>> exception. Failing the AsyncWaitOperator.*
>> *at
>> org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:137)*
>> *at
>> org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:85)*
>> *at java.base/java.lang.Thread.run(Thread.java:844)*
>> *Caused by: java.util.concurrent.ExecutionException:
>> java.util.concurrent.TimeoutException: Async function call has timed out.*
>> *at
>> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)*
>> *at
>> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)*
>> *at
>> org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.get(StreamRecordQueueEntry.java:68)*
>> *at
>> org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:129)*
>> *... 2 common frames omitted*
>> *Caused by: java.util.concurrent.TimeoutException: Async function call
>> has timed out.*
>> *at
>> org.apache.flink.streaming.api.scala.async.AsyncFunction.timeout(AsyncFunction.scala:60)*
>> *at
>> org.apache.flink.streaming.api.scala.async.AsyncFunction.timeout$(AsyncFunction.scala:59)*
>> *at
>> com.lookalike.analytic.utils.LookalikeScoreEnrich.timeout(LookalikeScoreEnrich.scala:18)*
>> *at
>> org.apache.flink.streaming.api.scala.AsyncDataStream$$anon$3.timeout(AsyncDataStream.scala:301)*
>> *at
>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$1.onProcessingTime(AsyncWaitOperator.java:211)*
>> *at
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)*
>> *at
>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:514)*
>> *at
>> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)*
>> *at
>> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:299)*
>> *at
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)*
>> *at
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)*
>> *... 1 common frames omitted*
>>
>> Please advise , Thanks
>> Avi
>>
>>


Getting async function call terminated with an exception

2019-05-07 Thread Avi Levi
Hi,
We are using flink 1.8.0 (but the flowing also happens in 1.7.2) I tried
very simple unordered async call
override def asyncInvoke(input: Foo, resultFuture: ResultFuture[ScoredFoo])
: Unit  = {
   val r = ScoredFoo(Foo("a"), 80)
   Future.successful(r)
   }

Running this stream seem to be stuck in some infinite loop until it crashes
on timeout exception.:

*java.lang.Exception: An async function call terminated with an exception.
Failing the AsyncWaitOperator.*
*at
org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:137)*
*at
org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:85)*
*at java.base/java.lang.Thread.run(Thread.java:844)*
*Caused by: java.util.concurrent.ExecutionException:
java.util.concurrent.TimeoutException: Async function call has timed out.*
*at
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)*
*at
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)*
*at
org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.get(StreamRecordQueueEntry.java:68)*
*at
org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:129)*
*... 2 common frames omitted*
*Caused by: java.util.concurrent.TimeoutException: Async function call has
timed out.*
*at
org.apache.flink.streaming.api.scala.async.AsyncFunction.timeout(AsyncFunction.scala:60)*
*at
org.apache.flink.streaming.api.scala.async.AsyncFunction.timeout$(AsyncFunction.scala:59)*
*at
com.lookalike.analytic.utils.LookalikeScoreEnrich.timeout(LookalikeScoreEnrich.scala:18)*
*at
org.apache.flink.streaming.api.scala.AsyncDataStream$$anon$3.timeout(AsyncDataStream.scala:301)*
*at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$1.onProcessingTime(AsyncWaitOperator.java:211)*
*at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)*
*at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:514)*
*at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)*
*at
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:299)*
*at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)*
*at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)*
*... 1 common frames omitted*

Please advise , Thanks
Avi


Re: Emitting current state to a sink

2019-04-30 Thread Avi Levi
Sure!
you get the context and the collector in the processBroadcastElement method
see snippet below

  override def processBroadcastElement(value: BroadcastRequest, ctx:
KeyedBroadcastProcessFunction[String, Request, BroadcastRequest,
String]#Context, out: Collector[String]): Unit = {

ctxctx.applyToKeyedState(stateDescriptor, new
KeyedStateFunction[String, ValueState[String]] {

  override def process(key: String, state: ValueState[String]): Unit =
Option(state.value()).foreach(s => out.collect(s))
  })

...
}


On Mon, Apr 29, 2019 at 5:45 PM M Singh  wrote:

> *This Message originated outside your organization.*
> --
> Hi Avi:
>
> Can you please elaborate (or include an example/code snippet) of how you
> were able to achieve collecting the keyed states from the
> processBroadcastElement method using the applyToKeyedState ?
>
> I am trying to understand which collector you used to emit the state since
> the broadcasted elements/state might be different from the non-broadcast
> elements/state.
>
> Thanks for your help.
>
> Mans
> On Monday, April 29, 2019, 7:29:23 AM EDT, Fabian Hueske <
> fhue...@gmail.com> wrote:
>
>
> Nice!
> Thanks for the confirmation :-)
>
> Am Mo., 29. Apr. 2019 um 13:21 Uhr schrieb Avi Levi <
> avi.l...@bluevoyant.com>:
>
> Thanks! Works like a charm :)
>
> On Mon, Apr 29, 2019 at 12:11 PM Fabian Hueske  wrote:
>
> *This Message originated outside your organization.*
> --
> Hi Avi,
>
> I'm not sure if  you cannot emit data from the keyed state when you
> receive a broadcasted message.
> The Context parameter of the processBroadcastElement() method in the
> KeyedBroadcastProcessFunction has the applyToKeyedState() method.
> The method takes a KeyedStateFunction that is applied to each key of a
> state, but does not provide a Collector to emit data.
> Maybe you can pass the collector to the KeyedStateFunction and emit
> records while it iterates over the key space.
>
> Best, Fabian
>
> Am Fr., 26. Apr. 2019 um 17:35 Uhr schrieb Avi Levi <
> avi.l...@bluevoyant.com>:
>
> Hi Timo,
> I defiantly did. but broadcasting a command and trying to address the
> persisted state (I mean the state of the data stream and not the
> broadcasted one) you get the exception that I wrote
> (java.lang.NullPointerException: No key set. This method should not be
> called outside of a keyed context). e.g doing something like
>
> override def processBroadcastElement(value: BroadcastRequest, ctx: 
> KeyedBroadcastProcessFunction[String, Request, BroadcastRequest, 
> Response]#Context, out: Collector[Response]): Unit = {
>   value match {
> case Command(StateCmd.Fetch, _) =>
>   if (state.value() != null) {
> ouout.collecy(state.value())
>   }
>
> will yield that exception
>
> BR
> Avi
>
> On Fri, Apr 26, 2019 at 11:55 AM Timo Walther  wrote:
>
> This Message originated outside your organization.
>
> Hi Avi,
>
> did you have a look at the .connect() and .broadcast() API
> functionalities? They allow you to broadcast a control stream to all
> operators. Maybe this example [1] or other examples in this repository
> can help you.
>
> Regards,
> Timo
>
> [1]
>
> https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/solutions/datastream_java/broadcast/TaxiQuerySolution.java
> <https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/solutions/datastream_java/broadcast/TaxiQuerySolution.java>
>
> Am 26.04.19 um 07:57 schrieb Avi Levi:
> > Hi,
> > We have a keyed pipeline with persisted state.
> > Is there a way to broadcast a command and collect all values that
> > persisted in  the state ?
> >
> > The end result can be for example sending a fetch command to all
> > operators and emitting the results to some sink
> >
> > why do we need it ? from time to time we might want to check if we are
> > missing keys what are the additional keys or simply emit the current
> > state to a table and to query it.
> >
> > I tried simply broadcasting a command and addressing the persisted
> > state but that resulted with:
> > java.lang.NullPointerException: No key set. This method should not be
> > called outside of a keyed context.
> >
> > is there a good way to achieve that ?
> >
> > Cheers
> > Avi
>
>


Re: Emitting current state to a sink

2019-04-29 Thread Avi Levi
Thanks! Works like a charm :)

On Mon, Apr 29, 2019 at 12:11 PM Fabian Hueske  wrote:

> *This Message originated outside your organization.*
> --
> Hi Avi,
>
> I'm not sure if  you cannot emit data from the keyed state when you
> receive a broadcasted message.
> The Context parameter of the processBroadcastElement() method in the
> KeyedBroadcastProcessFunction has the applyToKeyedState() method.
> The method takes a KeyedStateFunction that is applied to each key of a
> state, but does not provide a Collector to emit data.
> Maybe you can pass the collector to the KeyedStateFunction and emit
> records while it iterates over the key space.
>
> Best, Fabian
>
> Am Fr., 26. Apr. 2019 um 17:35 Uhr schrieb Avi Levi <
> avi.l...@bluevoyant.com>:
>
>> Hi Timo,
>> I defiantly did. but broadcasting a command and trying to address the
>> persisted state (I mean the state of the data stream and not the
>> broadcasted one) you get the exception that I wrote
>> (java.lang.NullPointerException: No key set. This method should not be
>> called outside of a keyed context). e.g doing something like
>>
>> override def processBroadcastElement(value: BroadcastRequest, ctx: 
>> KeyedBroadcastProcessFunction[String, Request, BroadcastRequest, 
>> Response]#Context, out: Collector[Response]): Unit = {
>>   value match {
>> case Command(StateCmd.Fetch, _) =>
>>   if (state.value() != null) {
>> ouout.collecy(state.value())
>>   }
>>
>> will yield that exception
>>
>> BR
>> Avi
>>
>> On Fri, Apr 26, 2019 at 11:55 AM Timo Walther  wrote:
>>
>>> This Message originated outside your organization.
>>>
>>> Hi Avi,
>>>
>>> did you have a look at the .connect() and .broadcast() API
>>> functionalities? They allow you to broadcast a control stream to all
>>> operators. Maybe this example [1] or other examples in this repository
>>> can help you.
>>>
>>> Regards,
>>> Timo
>>>
>>> [1]
>>>
>>> https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/solutions/datastream_java/broadcast/TaxiQuerySolution.java
>>> <https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/solutions/datastream_java/broadcast/TaxiQuerySolution.java>
>>>
>>> Am 26.04.19 um 07:57 schrieb Avi Levi:
>>> > Hi,
>>> > We have a keyed pipeline with persisted state.
>>> > Is there a way to broadcast a command and collect all values that
>>> > persisted in  the state ?
>>> >
>>> > The end result can be for example sending a fetch command to all
>>> > operators and emitting the results to some sink
>>> >
>>> > why do we need it ? from time to time we might want to check if we are
>>> > missing keys what are the additional keys or simply emit the current
>>> > state to a table and to query it.
>>> >
>>> > I tried simply broadcasting a command and addressing the persisted
>>> > state but that resulted with:
>>> > java.lang.NullPointerException: No key set. This method should not be
>>> > called outside of a keyed context.
>>> >
>>> > is there a good way to achieve that ?
>>> >
>>> > Cheers
>>> > Avi
>>>
>>>


Re: Emitting current state to a sink

2019-04-26 Thread Avi Levi
Hi Timo,
I defiantly did. but broadcasting a command and trying to address the
persisted state (I mean the state of the data stream and not the
broadcasted one) you get the exception that I wrote
(java.lang.NullPointerException: No key set. This method should not be
called outside of a keyed context). e.g doing something like

override def processBroadcastElement(value: BroadcastRequest, ctx:
KeyedBroadcastProcessFunction[String, Request, BroadcastRequest,
Response]#Context, out: Collector[Response]): Unit = {
  value match {
case Command(StateCmd.Fetch, _) =>
  if (state.value() != null) {
ouout.collecy(state.value())
  }

will yield that exception

BR
Avi

On Fri, Apr 26, 2019 at 11:55 AM Timo Walther  wrote:

> This Message originated outside your organization.
>
> Hi Avi,
>
> did you have a look at the .connect() and .broadcast() API
> functionalities? They allow you to broadcast a control stream to all
> operators. Maybe this example [1] or other examples in this repository
> can help you.
>
> Regards,
> Timo
>
> [1]
>
> https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/solutions/datastream_java/broadcast/TaxiQuerySolution.java
> <https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/solutions/datastream_java/broadcast/TaxiQuerySolution.java>
>
> Am 26.04.19 um 07:57 schrieb Avi Levi:
> > Hi,
> > We have a keyed pipeline with persisted state.
> > Is there a way to broadcast a command and collect all values that
> > persisted in  the state ?
> >
> > The end result can be for example sending a fetch command to all
> > operators and emitting the results to some sink
> >
> > why do we need it ? from time to time we might want to check if we are
> > missing keys what are the additional keys or simply emit the current
> > state to a table and to query it.
> >
> > I tried simply broadcasting a command and addressing the persisted
> > state but that resulted with:
> > java.lang.NullPointerException: No key set. This method should not be
> > called outside of a keyed context.
> >
> > is there a good way to achieve that ?
> >
> > Cheers
> > Avi
>
>


Emitting current state to a sink

2019-04-25 Thread Avi Levi
Hi,
We have a keyed pipeline with persisted state.
Is there a way to broadcast a command and collect all values that persisted
in  the state ?

The end result can be for example sending a fetch command to all operators
and emitting the results to some sink

why do we need it ? from time to time we might want to check if we are
missing keys what are the additional keys or simply emit the current state
to a table and to query it.

I tried simply broadcasting a command and addressing the persisted state
but that resulted with:
java.lang.NullPointerException: No key set. This method should not be
called outside of a keyed context.

is there a good way to achieve that ?

Cheers
Avi


Re: Getting JobExecutionException: Could not set up JobManager when trying to upload new version

2019-04-23 Thread Avi Levi
Might be useful for someone, Regarding this issue. it seems that changing
the uid of the operator made this mess .

On Tue, Apr 16, 2019 at 6:31 PM Avi Levi  wrote:

> I am trying to upload a new version of the code but I am getting the
> exception below. The schema of the state was not changed for a while . what
> can be the reason for that (also attached the log file) ?
>>
>>
>> 2019-04-16 15:14:11.112 [flink-akka.actor.default-dispatcher-1138] ERROR
>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Failed to
>> submit job 693a02204ef5816f91ea3b135f544a7f.
>> java.lang.RuntimeException:
>> org.apache.flink.runtime.client.JobExecutionException: Could not set up
>> JobManager
>> at
>> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
>> at
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>> at
>> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>> at
>> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
>> not set up JobManager
>> at
>> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:176)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
>> at
>> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>> ... 7 common frames omitted
>> Caused by: java.lang.IllegalStateException: Failed to rollback to
>> checkpoint/savepoint
>> gs://bv-flink-state/dev/state/savepoint-7cbaf2-48f14797. Cannot map
>> checkpoint/savepoint state for operator 3cfeb06db0484d5556a7de8db2025f09 to
>> the new program, because the operator is not available in the new program.
>> If you want to allow to skip this, you can set the --allowNonRestoredState
>> option on the CLI.
>> at
>> org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:205)
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1103)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.tryRestoreExecutionGraphFromSavepoint(JobMaster.java:1241)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1165)
>> at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:296)
>> at
>> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157)
>> ... 10 common frames omitted
>> 2019-04-16 15:14:11.242 [flink-akka.actor.default-dispatcher-1138] ERROR
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler  - Exception
>> occurred in REST handler:
>> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
>> job.
>> 2019-04-16 15:14:11.947 [flink-akka.actor.default-dispatcher-1155] ERROR
>> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler  - Exception
>> occurred in REST handler: Job e43ed04cdedd73f9bb9836e87142afbf not found
>>
>
> Thanks for your help.
>
> Cheers
> Avi
>


Re: Adding metadata to the jar

2019-04-10 Thread Avi Levi
Cool, thanks! Those are great options and just what I was looking for

On Tue, Apr 9, 2019 at 12:42 AM Timothy Victor  wrote:

> One approach I use is to write the git commit sha to the jars manifest
> while compiling it (I don't use semantic versioning but rather use commit
> sha).
>
> Then at runtime I read the implementationVersion
> (class.getPackage().getImplementationVersion()), and print that in the job
> name.
>
> Tim
>
> On Mon, Apr 8, 2019, 4:29 PM Bruno Aranda  wrote:
>
>> Hi Avi,
>>
>> Don't know if there are better ways, but we store the version of the job
>> running and other metadata as part of the "User configuration" of the job,
>> so it shows in the UI when you go to the job Configuration tab inside the
>> job. To do so, when we create the job:
>>
>> val buildInfo = new Configuration()
>> buildInfo.setString("version", "0.1.0")
>>
>>
>> val env = StreamExecutionEnvironment.*getExecutionEnvironment
>> *env.getConfig.setGlobalJobParameters(buildInfo)
>> ...
>>
>> It helps us to have a convenient way of knowing what version of the jobs
>> are running, when they were built, etc...
>>
>> Cheers,
>>
>> Bruno
>>
>>
>> On Mon, 8 Apr 2019 at 18:04, Avi Levi  wrote:
>>
>>> Is there a way to add some metadata to the jar and see it on dashboard ?
>>> I couldn't find a way to do so but I think it very useful.
>>> Consider that you want to know which version is actually running in the
>>> job manager (not just which jar is uploaded which is not necessary being
>>> running at the moment ), AFAIK by looking at dashboard there is no way to
>>> know which jar / version is actually executing. Well as a workaround you
>>> can add something to the job name but this is a limited option, what if one
>>> wants to add more info programatically?
>>>
>>> Is there a way to do it ?
>>>
>>> BR
>>> Avi
>>>
>>


Adding metadata to the jar

2019-04-08 Thread Avi Levi
Is there a way to add some metadata to the jar and see it on dashboard ? I
couldn't find a way to do so but I think it very useful.
Consider that you want to know which version is actually running in the job
manager (not just which jar is uploaded which is not necessary being
running at the moment ), AFAIK by looking at dashboard there is no way to
know which jar / version is actually executing. Well as a workaround you
can add something to the job name but this is a limited option, what if one
wants to add more info programatically?

Is there a way to do it ?

BR
Avi


How to see user configuration in UI dashboard

2019-04-07 Thread Avi Levi
Is it possible to (or how to) set custom parameters programmatically that
can be viewed in configuration tab (UI) either executions config / user
configuration ?


Re: AskTimeoutException - Cannot deploy task

2019-03-28 Thread Avi Levi
Hi All,
Following my previous mail below I see the exception below.
I really appreciate any help here
Attached is log files
Looking at the logs we see this exception all around :

2019-03-28 23:51:58,460 WARN  org.apache.kafka.common.network.Selector
 - Unexpected error from
kafka-3.c.mako.internal/10.157.9.103; closing connection
java.lang.IllegalStateException: Buffer overflow when available data
size (16384) >= application buffer size (16384)
at 
org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:470)
at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:85)
at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
at 
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
at 
org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)



-- Forwarded message -
From: Avi Levi 
Date: Thu, Mar 28, 2019 at 11:03 PM
Subject: AskTimeoutException - Cannot deploy task
To: user 


Hi,
I see the following exceptions, will really appreciate any help on that

Thanks

Avi


This is the first one (out of three) :


java.lang.Exception: Cannot deploy task KeyedProcess -> Sink: Unnamed
(3/100) (2c9646634afe1488659da404e92697b0) - TaskManager
(container_e03_1553795623823_0001_01_03 @
dataproc-cluster-w-14.XXX (dataPort=45777)) not responding after a
rpcTimeout of 1 ms
at 
org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$5(Execution.java:624)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka.tcp://fl...@dataproc-cluster-w-14.c.:38047/user/taskmanager_0#-1921747025]]
after [1 ms]. Sender[null] sent message of type
"org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation".
at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:604)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)
at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
at 
scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)
at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
... 1 more


and this is the second:

rg.apache.flink.util.FlinkException: The assigned slot
container_e03_1553795623823_0001_01_44_3 was removed.
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(

AskTimeoutException - Cannot deploy task

2019-03-28 Thread Avi Levi
Hi,
I see the following exceptions, will really appreciate any help on that

Thanks

Avi


This is the first one (out of three) :


java.lang.Exception: Cannot deploy task KeyedProcess -> Sink: Unnamed
(3/100) (2c9646634afe1488659da404e92697b0) - TaskManager
(container_e03_1553795623823_0001_01_03 @
dataproc-cluster-w-14.XXX (dataPort=45777)) not responding after a
rpcTimeout of 1 ms
at 
org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$5(Execution.java:624)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka.tcp://fl...@dataproc-cluster-w-14.c.:38047/user/taskmanager_0#-1921747025]]
after [1 ms]. Sender[null] sent message of type
"org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation".
at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:604)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)
at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
at 
scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)
at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
... 1 more


and this is the second:

rg.apache.flink.util.FlinkException: The assigned slot
container_e03_1553795623823_0001_01_44_3 was removed.
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:1058)
at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:385)
at 
org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:825)
at 
org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run(ResourceManager.java:1139)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor.aroundReceive(Actor.scala:502)
at akka.actor.Actor.aroundReceive$(Actor.scala:500)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

Third one


java.util.concurrent.TimeoutException: Heartbeat of 

Re: questions regarding offset

2019-03-28 Thread Avi Levi
Thanks for answering. please see my comments below

On Thu, Mar 28, 2019 at 12:32 PM Dawid Wysakowicz 
wrote:

> Hi Avi,
>
> Yes, you are right. Kafka offsets are kept in state.
>
> Ad. 1 If you try to restore a state in a completely different
> environment, and offsets are no longer compatible it will most probably
> fail as it won't be able to derive up to which point we already
> processed the records.
>
So there is no way to move state between clusters ? I thought that the
offsets are managed also by job id. butI guess I was wrong

>
> Ad.2 What do you mean by stateless job? Do you mean a job with
> checkpoints disabled? If so then the job does not checkpoint kafka
> offsets. They might be committed back to Kafka based on the internal
> Kafka consumer configuration[1]. So in case of failover it will use
> given start position configuration[2].
>

By stateless I mean a job without need to persist a state but with
checkpoints enabled.

>
> Best,
>
> Dawid
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
>
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>
>
> On 28/03/2019 06:51, Avi Levi wrote:
> > Hi Guys,
> > I understood that offset is kept as part of the checkpoint and
> > persisted in the state (please correct me if I'm wrong)
> >
> > 1. If I copy my persisted state to another cluster (different kafka
> > servers as well) how is the offset handled ?
> > 2. In a stateless job how is the offset managed ? since there is no
> > persistency . I mean in aspect of exactly once, recovery ...
> >
> > BR
> > Avi
>
>


questions regarding offset

2019-03-27 Thread Avi Levi
Hi Guys,
I understood that offset is kept as part of the checkpoint and persisted in
the state (please correct me if I'm wrong)

1. If I copy my persisted state to another cluster (different kafka servers
as well) how is the offset handled ?
2. In a stateless job how is the offset managed ? since there is no
persistency . I mean in aspect of exactly once, recovery ...

BR
Avi


Re: ingesting time for TimeCharacteristic.IngestionTime on unit test

2019-03-25 Thread Avi Levi
Thanks, I'll check it out. I got a bit confused with the Ingesting time
equals to null in tests but all is ok now , I appreciate that

On Mon, Mar 25, 2019 at 1:01 PM Kostas Kloudas  wrote:

> Hi Avi,
>
> Just to verify your ITCase, I wrote the following dummy example and it
> seems to be "working" (ie. I can see non null timestamps and timers firing).
>
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> env.setParallelism(1);
>
> env
>   .addSource(new LongSource())
>   .keyBy(elmnt -> elmnt)
>   .process(new KeyedProcessFunction() {
>
>  @Override
>  public void processElement(Long value, Context ctx, Collector 
> out) throws Exception {
>
>
> long timestamp = ctx.timestamp();
> long timerTimestamp = timestamp + 
> Time.seconds(10).toMilliseconds();
>
> System.out.println(ctx.timestamp() + " " + timerTimestamp);
>
> ctx.timerService().registerProcessingTimeTimer(timerTimestamp);
>  }
>
>  @Override
>  public void onTimer(long timestamp, OnTimerContext ctx, 
> Collector out) throws Exception {
> System.out.println("TIMER: " + timestamp +" "+ ctx.timeDomain());
>  }
>   }).print();
> env.execute();
>
> The source is:
>
> private static final class LongSource implements SourceFunction {
>
>private volatile boolean running = true;
>
>private long element = 0L;
>
>@Override
>public void run(SourceContext ctx) throws Exception {
>   while (running) {
>  ctx.collect(element++ % 10);
>  Thread.sleep(10L);
>   }
>}
>
>@Override
>public void cancel() {
>
>}
> }
>
>
> Could you provide more details on how your usecase differs from the above
> dummy example so that we can pin down the problem?
>
> As a side-note, Ingestion time is essentially event time, with the only
> difference that the timestamp assigner in the beginning gives each element
> the timestamp System.currentTimeMillis. So in this case, maybe you could
> also consider setting event time timers but keep in mind then your
> Watermark emission interval.
>
> In addition, if you want to simply check processing time processing of you
> operator (not the whole pipeline), then you could make use of the
> OneInputStreamTaskTestHarness or its keyed variant. This allows you to
> provide your own processing time provider thus allow you to
> deterministically
> test processing time behaviour.
>
> Cheers,
> Kostas
>
>
>
> On Sat, Mar 23, 2019 at 9:32 AM Avi Levi  wrote:
>
>> Any idea what should I do to overcome this?
>>
>> On Wed, Mar 20, 2019 at 7:17 PM Avi Levi  wrote:
>>
>>> Hi Andrey,
>>> I am testing a Filter operator that receives a key from the stream and
>>> checks if it is a new one or not. if it is new it keeps it in state and
>>> fire a timer all that is done using the ProcessFunction.
>>> The testing is using some CollectSink as described here
>>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#integration-testing>
>>>  and
>>> the source is implementation of the SourceFunction that accepts a
>>> collection of values and adds it to ctx.collect .
>>> The ctx.timestamp() is null, BUT even if I set the timer to sometime in
>>> the future ctx.timerService.registerProcessingTimeTimer(currenttimestamp +
>>> x) the timer is fired immediately.
>>>
>>>
>>> On Wed, Mar 20, 2019 at 10:39 AM Andrey Zagrebin 
>>> wrote:
>>>
>>>> Hi Avi,
>>>>
>>>> what is the structure of your unit test? do you create some source and
>>>> then apply function or you test only ProcessFunction methods in isolation?
>>>> does ctx.timestamp() return zero or which value?
>>>>
>>>> Best,
>>>> Andrey
>>>>
>>>>
>>>> On Tue, Mar 19, 2019 at 9:19 PM Avi Levi 
>>>> wrote:
>>>>
>>>>> Hi Andrey ,
>>>>> I'm using IngestionTime
>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>>>>>
>>>>> This is my timer in the processElement:
>>>>>val nextTime: Long = ctx.timestamp()  + daysInMilliseconds(14)
>>>>>ctx.timerService.registerProcessingTimeTimer(nextTim)
>>>>>
>>>>> The problem is how do I u

Re: ingesting time for TimeCharacteristic.IngestionTime on unit test

2019-03-23 Thread Avi Levi
Any idea what should I do to overcome this?

On Wed, Mar 20, 2019 at 7:17 PM Avi Levi  wrote:

> Hi Andrey,
> I am testing a Filter operator that receives a key from the stream and
> checks if it is a new one or not. if it is new it keeps it in state and
> fire a timer all that is done using the ProcessFunction.
> The testing is using some CollectSink as described here
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#integration-testing>
>  and
> the source is implementation of the SourceFunction that accepts a
> collection of values and adds it to ctx.collect .
> The ctx.timestamp() is null, BUT even if I set the timer to sometime in
> the future ctx.timerService.registerProcessingTimeTimer(currenttimestamp +
> x) the timer is fired immediately.
>
>
> On Wed, Mar 20, 2019 at 10:39 AM Andrey Zagrebin 
> wrote:
>
>> Hi Avi,
>>
>> what is the structure of your unit test? do you create some source and
>> then apply function or you test only ProcessFunction methods in isolation?
>> does ctx.timestamp() return zero or which value?
>>
>> Best,
>> Andrey
>>
>>
>> On Tue, Mar 19, 2019 at 9:19 PM Avi Levi  wrote:
>>
>>> Hi Andrey ,
>>> I'm using IngestionTime
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>>>
>>> This is my timer in the processElement:
>>>val nextTime: Long = ctx.timestamp()  + daysInMilliseconds(14)
>>>ctx.timerService.registerProcessingTimeTimer(nextTim)
>>>
>>> The problem is how do I use it in my unit tests ? since there is no
>>> IngestionTime and timers are fired immediately so the timers actions (such
>>> as state cleanup) are fired before time and causing the tests to fail .
>>>
>>>
>>>
>>>
>>> On Tue, Mar 19, 2019 at 7:47 PM Andrey Zagrebin 
>>> wrote:
>>>
>>>> Hi Avi,
>>>>
>>>> do you use processing time timer
>>>> (timerService().registerProcessingTimeTimer)?
>>>> why do you need ingestion time? do you
>>>> set TimeCharacteristic.IngestionTime?
>>>>
>>>> Best,
>>>> Andrey
>>>>
>>>> On Tue, Mar 19, 2019 at 1:11 PM Avi Levi 
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> Our stream is not based on time sequence and we do not use time based
>>>>> operations. we do want to clean the state after x days hence we fire timer
>>>>> event. My problem is that our unit test fires the event immediately (there
>>>>> is no ingestion time) how can I inject ingestion time ?
>>>>>
>>>>> Cheers
>>>>> Avi
>>>>>
>>>>>


ingesting time for TimeCharacteristic.IngestionTime on unit test

2019-03-19 Thread Avi Levi
Hi,
Our stream is not based on time sequence and we do not use time based
operations. we do want to clean the state after x days hence we fire timer
event. My problem is that our unit test fires the event immediately (there
is no ingestion time) how can I inject ingestion time ?

Cheers
Avi


Re: Random forest - Flink ML

2019-03-12 Thread Avi Levi
Thanks Flavio,
I will definitely check it out. But from a quick glance , it seems that it
is missing implementation of "random forest" which is something that we are
looking for .
If anyone can recommend/suggest/share that will be greatly appreciated.

Best Regards
Avi


On Mon, Mar 11, 2019 at 10:01 PM Flavio Pompermaier 
wrote:

> I know there's an outgoing promising effort on improving Flink ML in the
> Streamline project [1] but I don't know why it's not very
> considered/advertised.
>
> Best,
> Flavio
>
> [1] https://h2020-streamline-project.eu/apache-flink/
>
> Il Lun 11 Mar 2019, 15:40 Avi Levi  ha scritto:
>
>> HI ,
>>  According to Tills comment
>> <https://issues.apache.org/jira/browse/FLINK-1728?focusedCommentId=16780468=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16780468>
>> I understand that flink-ml is going to be ditched. What will be the
>> alternative ?
>> Looking for a "random forest" method that we can add to our pipeline
>> (scala). any suggestions?
>>
>> Thanks
>> Avi
>>
>>
>>
>>


Random forest - Flink ML

2019-03-11 Thread Avi Levi
HI ,
 According to Tills comment

I understand that flink-ml is going to be ditched. What will be the
alternative ?
Looking for a "random forest" method that we can add to our pipeline
(scala). any suggestions?

Thanks
Avi


Re: estimate number of keys on rocks db

2019-03-10 Thread Avi Levi
Thanks Yun,
Attached. Please let me know if it is ok.
I made several trials including aggregation functions but couldn't figure
out why the line is not going straight up and why having those picks .



On Sun, Mar 10, 2019 at 4:49 PM Yun Tang  wrote:

> Hi Avi
>
> Unfortunately, we cannot see the attached images. By the way, did you ever
> use window in this job?
>
> Best
> Yun Tang
> ------
> *From:* Avi Levi 
> *Sent:* Sunday, March 10, 2019 19:41
> *To:* user
> *Subject:* estimate number of keys on rocks db
>
> Hi,
> I am trying to estimate number of keys at a given minute.
> I created a graph based on avg_over_time
> <https://prometheus.io/docs/prometheus/latest/querying/functions/#aggregation_over_time>
>  with
> 1hr and 5m interval. looking at the graph you can see that it has high
> spikes which doesn't make sense (IMO) how can the average have those spikes
> ? after all since I do not delete keys I would expect to go up or remain
> the same.
> any ideas what can explain such behaviour ?
> attached are graphs 5m and 1 h intervals
> [image: Screen Shot 2019-03-10 at 13.37.44.png]
> [image: Screen Shot 2019-03-10 at 13.33.40.png]
>
>
>
>


estimate number of keys on rocks db

2019-03-10 Thread Avi Levi
Hi,
I am trying to estimate number of keys at a given minute.
I created a graph based on avg_over_time

with
1hr and 5m interval. looking at the graph you can see that it has high
spikes which doesn't make sense (IMO) how can the average have those spikes
? after all since I do not delete keys I would expect to go up or remain
the same.
any ideas what can explain such behaviour ?
attached are graphs 5m and 1 h intervals
[image: Screen Shot 2019-03-10 at 13.37.44.png]
[image: Screen Shot 2019-03-10 at 13.33.40.png]


Reading messages from start - new job submission

2019-02-17 Thread Avi Levi
I'm updating a job without savepoint.
The consumer properties is set to *  prop.setProperty("auto.offset.reset",
"earliest")*
The start strategy is not explicitly set (using the default
setStartFromGroupOffsets).
In this case I expect that the consumer will read the messages from the
beginning since there is no offset restored, but is actually reading from
latest. What am I missing?

Best
Avi


Production readiness

2019-02-13 Thread Avi Levi
Hi
Looking at the production readiness

checklist - is there any rule of thumb to determine the maximum parallelism
? we have a stateful pipeline with high throughput (4k requests/sec)
running on google cloud (yarn) .
I understood that if we are not setting it the default setting is 128 but
it can change in the future but if we set it, it cannot be change later -
correct ?

Is there any way to get info on state (RocksDB) e.g number of keys , or
list of keys ?

Regards
Avi


Re: getting duplicate messages from duplicate jobs

2019-01-30 Thread Avi Levi
Ok, if you guys think it's should be like that then so be it. All I am
saying is that it is not standard behaviour from kafka consumer, at least
according to the documentation
<https://kafka.apache.org/documentation/#intro_consumers> . I understand
that flink implements things differently and all I am saying that this is
not according to kafka standard consumer group.


On Tue, Jan 29, 2019 at 9:47 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi,
>
> Yes, Dawid is correct.
>
> The "
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__group.id=DwMFaQ=euGZstcaTDllvimEN8b7jXrwqOf-v5A_CdpgnVfiiMM=dpWtkT5FJRWFqDA3MAnB4-dRYGDQjgfQTYAocqGkRKo=KOx07VP_axxlgt0eYEnVHNux-bJCoSwzMInyMaKqCR4=6pT15T_bD5q0Rbzla6yl5aqek0zSwEhMI41vxALQAKg=>
> group.id
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__group.id=DwQFaQ=euGZstcaTDllvimEN8b7jXrwqOf-v5A_CdpgnVfiiMM=dpWtkT5FJRWFqDA3MAnB4-dRYGDQjgfQTYAocqGkRKo=KOx07VP_axxlgt0eYEnVHNux-bJCoSwzMInyMaKqCR4=6pT15T_bD5q0Rbzla6yl5aqek0zSwEhMI41vxALQAKg=>"
> setting in Flink's Kafka Consumer is only used for group offset fetching
> and committing offsets back to Kafka (only for exposure purposes, not used
> for processing guarantees).
> The Flink Kafka Consumer uses static partition assignment on the
> KafkaConsumer API, and not consumer group-based automatic partition
> assignments.
>
> Cheers,
> Gordon
>
> On Sun, Jan 27, 2019 at 12:28 AM Dawid Wysakowicz 
> wrote:
>
>> Forgot to cc Gordon :)
>>
>> On 23/01/2019 18:02, Avi Levi wrote:
>> > Hi,
>> > This quite confusing.
>> > I submitted the same stateless job twice (actually I upload it once).
>> > However when I place a message on kafka, it seems that both jobs
>> > consumes it, and publish the same result (we publish the result to
>> > other kafka topic, so I actually see the massage duplicated on kafka
>> > ). how can it be ? both jobs are using the same group id (group id is
>> > fixed and not generated )
>> >
>> > Kind regards
>> > Avi
>>
>>


getting duplicate messages from duplicate jobs

2019-01-23 Thread Avi Levi
Hi,
This quite confusing.
I submitted the same stateless job twice (actually I upload it once).
However when I place a message on kafka, it seems that both jobs consumes
it, and publish the same result (we publish the result to other kafka
topic, so I actually see the massage duplicated on kafka ). how can it be ?
both jobs are using the same group id (group id is fixed and not generated )

Kind regards
Avi


Re: Getting RemoteTransportException, HA mode

2019-01-20 Thread Avi Levi
Thanks! regarding the HA. So the Job will auto recover after a crash
understood. Is that true also when deploying new version? easy as  simply
canceling one job, updating the version and once it is up & running do the
same to the other one ? is there anything that is should be highlighted
when working in HA mode (e.g configuration, machines setup besides what is
written in the Job manager HA documentation
<https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/jobmanager_high_availability.html>
) ?

On Thu, Jan 17, 2019 at 9:39 PM Jamie Grier  wrote:

> Avi,
>
> The stack trace there is pretty much a red herring.  That happens whenever
> a job shuts down for any reason and is not a root cause.  To diagnose this
> you will want to look at all the TaskManager logs as well as the JobManager
> logs.  If you have a way to easily grep these (all of them at once) I would
> search for a string like "to FAILED" on the taskmanagers and look at those
> error lines and stacktraces.
>
> Don't be misled by the exception reported in the Flink UI.  It OFTEN isn't
> the true root cause but it's a hard problem to solve.  You have to look at
> the TaskManager logs to really be sure.
>
> The taskmanager.network.netty.server.numThreads
> <https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-network-netty-server-numthreads>
>  is
> also a red herring.  I would leave that alone.
>
> Finally, if you have HA and checkpointing setup correctly you will not
> lose any state even in the case of losing a JobManager.  The job will
> auto-recover as soon as a new JobManager becomes available.
>
> I hope that helps.
>
> -Jamie
>
>
>
> On Thu, Jan 17, 2019 at 7:10 AM Dominik Wosiński  wrote:
>
>> *Hey,*
>> As for the question about  taskmanager.network.netty.server.numThreads
>> <https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-network-netty-server-numthreads>.
>> It is the size of the thread pool that will be used by the netty server.
>> The default value is -1, which will result in the thread pool with size
>> equal to the number of task slots for your JobManager.
>>
>> Best Regards,
>> Dom.
>>
>> czw., 17 sty 2019 o 00:52 Avi Levi  napisał(a):
>>
>>> Hi Guys,
>>>
>>> We done some load tests and we got the exception below, I saw that the
>>> JobManager was restarted, If I understood correctly, it will get new job id
>>> and the state will lost - is that correct? how the state is handled setting
>>> HA as described here
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/jobmanager_high_availability.html>,
>>>  what
>>> actually happens to the state if one of the job manager crashes (keyed
>>> state using rocks db) ?
>>>
>>>
>>> One of the property that might be relevant to this exception is
>>> taskmanager.network.netty.server.numThreads
>>> <https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-network-netty-server-numthreads>
>>>  with
>>> a default value of -1 - what is this default value actually means?  and
>>> should it be set to different value according to #cores?
>>>
>>>
>>> Thanks for your advice .
>>>
>>> Avi
>>>
>>>
>>>
>>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>>> Lost connection to task manager ':1234'. This indicates that the remote
>>> task manager was lost.
>>>
>>> at
>>> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:160)
>>>
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>>>
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>>>
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
>>>
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>>>
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>>>

Getting RemoteTransportException

2019-01-16 Thread Avi Levi
Hi Guys,

We done some load tests and we got the exception below, I saw that the
JobManager was restarted, If I understood correctly, it will get new job id
and the state will lost - is that correct? how the state is handled setting
HA as described here
,
what
actually happens to the state if one of the job manager crashes (keyed
state using rocks db) ?


One of the property that might be relevant to this exception is
taskmanager.network.netty.server.numThreads

with
a default value of -1 - what is this default value actually means?  and
should it be set to different value according to #cores?


Thanks for your advice .

Avi



org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Lost connection to task manager ':1234'. This indicates that the remote
task manager was lost.

at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:160)

at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)

at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)

at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)

at
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)

at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)

at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)

at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)

at
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerAdapter.exceptionCaught(ChannelHandlerAdapter.java:87)

at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)

at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)

at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)

at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1401)

at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)

at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)

at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:953)

at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.handleReadException(AbstractNioByteChannel.java:125)

at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:174)

at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)

at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)

at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)

at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)

at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.io.IOException: Connection reset by peer

at sun.nio.ch.FileDispatcherImpl.read0(Native Method)

at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)

at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)

at sun.nio.ch.IOUtil.read(IOUtil.java:192)

at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)

at
org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)

at
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1108)

at
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:345)

at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)

... 6 

Re: Passing vm options

2019-01-08 Thread Avi Levi
Got it. Thanks

On Mon, Jan 7, 2019 at 5:32 PM Dominik Wosiński  wrote:

> Hey,
> AFAIK, Flink supports dynamic properties currently only on YARN and not
> really in standalone mode.
> If You are using YARN it should indeed be possible to set such
> configuration. If not, then I am afraid it is not possible.
>
> Best Regards,
> Dom.
>
>
> pon., 7 sty 2019 o 09:01 Avi Levi  napisał(a):
>
>> Hi ,
>> I am trying to pass some vm options e.g
>> bin/flink run foo-assembly-0.0.1-SNAPSHOT.jar
>> -Dflink.stateDir=file:///tmp/ -Dkafka.bootstrap.servers="localhost:9092"
>> -Dkafka.security.ssl.enabled=false
>> but it doesn't seem to override the values in application.conf . Am I
>> missing something?
>> BTW is it possible to pass config file using -Dcofig.file ?
>>
>> BR
>> Avi
>>
>


Passing vm options

2019-01-07 Thread Avi Levi
Hi ,
I am trying to pass some vm options e.g
bin/flink run foo-assembly-0.0.1-SNAPSHOT.jar -Dflink.stateDir=file:///tmp/
-Dkafka.bootstrap.servers="localhost:9092"
-Dkafka.security.ssl.enabled=false
but it doesn't seem to override the values in application.conf . Am I
missing something?
BTW is it possible to pass config file using -Dcofig.file ?

BR
Avi


Re: using updating shared data

2019-01-06 Thread Avi Levi
Sounds like a good idea. because in the control stream the time doesn't
really matters. Thanks !!!

On Fri, Jan 4, 2019 at 11:13 AM David Anderson 
wrote:

> Another solution to the watermarking issue is to write an
> AssignerWithPeriodicWatermarks for the control stream that always returns
> Watermark.MAX_WATERMARK as the current watermark. This produces watermarks
> for the control stream that will effectively be ignored.
>
> On Thu, Jan 3, 2019 at 9:18 PM Avi Levi  wrote:
>
>> Thanks for the tip Elias!
>>
>> On Wed, Jan 2, 2019 at 9:44 PM Elias Levy 
>> wrote:
>>
>>> One thing you must be careful of, is that if you are using event time
>>> processing, assuming that the control stream will only receive messages
>>> sporadically, is that event time will stop moving forward in the operator
>>> joining the streams while the control stream is idle.  You can get around
>>> this by using a periodic watermark extractor one the control stream that
>>> bounds the event time delay to processing time or by defining your own low
>>> level operator that ignores watermarks from the control stream.
>>>
>>> On Wed, Jan 2, 2019 at 8:42 AM Avi Levi  wrote:
>>>
>>>> Thanks Till I will defiantly going to check it. just to make sure that
>>>> I got you correctly. you are suggesting the the list that I want to
>>>> broadcast will be broadcasted via control stream and it will be than be
>>>> kept in the relevant operator state correct ? and updates (CRUD) on that
>>>> list will be preformed via the control stream. correct ?
>>>> BR
>>>> Avi
>>>>
>>>> On Wed, Jan 2, 2019 at 4:28 PM Till Rohrmann 
>>>> wrote:
>>>>
>>>>> Hi Avi,
>>>>>
>>>>> you could use Flink's broadcast state pattern [1]. You would need to
>>>>> use the DataStream API but it allows you to have two streams (input and
>>>>> control stream) where the control stream is broadcasted to all sub tasks.
>>>>> So by ingesting messages into the control stream you can send model 
>>>>> updates
>>>>> to all sub tasks.
>>>>>
>>>>> [1]
>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_stream_state_broadcast-5Fstate.html=DwMFaQ=euGZstcaTDllvimEN8b7jXrwqOf-v5A_CdpgnVfiiMM=dpWtkT5FJRWFqDA3MAnB4-dRYGDQjgfQTYAocqGkRKo=u5UQh821Gau2wZ7S3M8IRmVpL5JxGADJaq_k7iq6sYo=uITdFlQPKLbqxkTux4nR21JhUpLIkS5Pdfi9D_ZSUwE=>
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_stream_state_broadcast-5Fstate.html=DwQFaQ=euGZstcaTDllvimEN8b7jXrwqOf-v5A_CdpgnVfiiMM=dpWtkT5FJRWFqDA3MAnB4-dRYGDQjgfQTYAocqGkRKo=u5UQh821Gau2wZ7S3M8IRmVpL5JxGADJaq_k7iq6sYo=uITdFlQPKLbqxkTux4nR21JhUpLIkS5Pdfi9D_ZSUwE=>
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Tue, Jan 1, 2019 at 6:49 PM miki haiat  wrote:
>>>>>
>>>>>> Im trying to understand  your  use case.
>>>>>> What is the source  of the data ? FS ,KAFKA else ?
>>>>>>
>>>>>>
>>>>>> On Tue, Jan 1, 2019 at 6:29 PM Avi Levi 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> I have a list (couple of thousands text lines) that I need to use in
>>>>>>> my map function. I read this article about broadcasting variables
>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_batch_-23broadcast-2Dvariables=DwMFaQ=euGZstcaTDllvimEN8b7jXrwqOf-v5A_CdpgnVfiiMM=dpWtkT5FJRWFqDA3MAnB4-dRYGDQjgfQTYAocqGkRKo=u5UQh821Gau2wZ7S3M8IRmVpL5JxGADJaq_k7iq6sYo=U3vGeHdL9fGDfP0GNZUkGpSlcVLz9CNLg2MXNwHP0_M=>
>>>>>>>  or
>>>>>>> using distributed cache
>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_batch_-23distributed-2Dcache=DwMFaQ=euGZstcaTDllvimEN8b7jXrwqOf-v5A_CdpgnVfiiMM=dpWtkT5FJRWFqDA3MAnB4-dRYGDQjgfQTYAocqGkRKo=u5UQh821Gau2wZ7S3M8IRmVpL5JxGADJaq_k7iq6sYo=m5IHbX1Dbz7AYERvVgyxKXmrUQQ06IkA4VCDllkR0HM=>
>>>>>>> however I need to update this list from time to time, and if I 
>>>>>>> understood
>>>>>>> correctly it is not possible on broadcast or cache without restarting 
>>>>>>> the
>>>>>>> job. Is there idiomatic way to achieve this? A db seems to be an 
>>>>>>> overkill
>>>>>>> for that and I do want to be cheap on io/network calls as much as 
>>>>>>> possible.
>>>>>>>
>>>>>>> Cheers
>>>>>>> Avi
>>>>>>>
>>>>>>>


Re: using updating shared data

2019-01-03 Thread Avi Levi
Thanks for the tip Elias!

On Wed, Jan 2, 2019 at 9:44 PM Elias Levy 
wrote:

> One thing you must be careful of, is that if you are using event time
> processing, assuming that the control stream will only receive messages
> sporadically, is that event time will stop moving forward in the operator
> joining the streams while the control stream is idle.  You can get around
> this by using a periodic watermark extractor one the control stream that
> bounds the event time delay to processing time or by defining your own low
> level operator that ignores watermarks from the control stream.
>
> On Wed, Jan 2, 2019 at 8:42 AM Avi Levi  wrote:
>
>> Thanks Till I will defiantly going to check it. just to make sure that I
>> got you correctly. you are suggesting the the list that I want to broadcast
>> will be broadcasted via control stream and it will be than be kept in the
>> relevant operator state correct ? and updates (CRUD) on that list will be
>> preformed via the control stream. correct ?
>> BR
>> Avi
>>
>> On Wed, Jan 2, 2019 at 4:28 PM Till Rohrmann 
>> wrote:
>>
>>> Hi Avi,
>>>
>>> you could use Flink's broadcast state pattern [1]. You would need to use
>>> the DataStream API but it allows you to have two streams (input and control
>>> stream) where the control stream is broadcasted to all sub tasks. So by
>>> ingesting messages into the control stream you can send model updates to
>>> all sub tasks.
>>>
>>> [1]
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_stream_state_broadcast-5Fstate.html=DwMFaQ=euGZstcaTDllvimEN8b7jXrwqOf-v5A_CdpgnVfiiMM=dpWtkT5FJRWFqDA3MAnB4-dRYGDQjgfQTYAocqGkRKo=u5UQh821Gau2wZ7S3M8IRmVpL5JxGADJaq_k7iq6sYo=uITdFlQPKLbqxkTux4nR21JhUpLIkS5Pdfi9D_ZSUwE=>
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_stream_state_broadcast-5Fstate.html=DwQFaQ=euGZstcaTDllvimEN8b7jXrwqOf-v5A_CdpgnVfiiMM=dpWtkT5FJRWFqDA3MAnB4-dRYGDQjgfQTYAocqGkRKo=u5UQh821Gau2wZ7S3M8IRmVpL5JxGADJaq_k7iq6sYo=uITdFlQPKLbqxkTux4nR21JhUpLIkS5Pdfi9D_ZSUwE=>
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Jan 1, 2019 at 6:49 PM miki haiat  wrote:
>>>
>>>> Im trying to understand  your  use case.
>>>> What is the source  of the data ? FS ,KAFKA else ?
>>>>
>>>>
>>>> On Tue, Jan 1, 2019 at 6:29 PM Avi Levi 
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> I have a list (couple of thousands text lines) that I need to use in
>>>>> my map function. I read this article about broadcasting variables
>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_batch_-23broadcast-2Dvariables=DwMFaQ=euGZstcaTDllvimEN8b7jXrwqOf-v5A_CdpgnVfiiMM=dpWtkT5FJRWFqDA3MAnB4-dRYGDQjgfQTYAocqGkRKo=u5UQh821Gau2wZ7S3M8IRmVpL5JxGADJaq_k7iq6sYo=U3vGeHdL9fGDfP0GNZUkGpSlcVLz9CNLg2MXNwHP0_M=>
>>>>>  or
>>>>> using distributed cache
>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_batch_-23distributed-2Dcache=DwMFaQ=euGZstcaTDllvimEN8b7jXrwqOf-v5A_CdpgnVfiiMM=dpWtkT5FJRWFqDA3MAnB4-dRYGDQjgfQTYAocqGkRKo=u5UQh821Gau2wZ7S3M8IRmVpL5JxGADJaq_k7iq6sYo=m5IHbX1Dbz7AYERvVgyxKXmrUQQ06IkA4VCDllkR0HM=>
>>>>> however I need to update this list from time to time, and if I understood
>>>>> correctly it is not possible on broadcast or cache without restarting the
>>>>> job. Is there idiomatic way to achieve this? A db seems to be an overkill
>>>>> for that and I do want to be cheap on io/network calls as much as 
>>>>> possible.
>>>>>
>>>>> Cheers
>>>>> Avi
>>>>>
>>>>>


Re: using updating shared data

2019-01-02 Thread Avi Levi
Thanks Till I will defiantly going to check it. just to make sure that I
got you correctly. you are suggesting the the list that I want to broadcast
will be broadcasted via control stream and it will be than be kept in the
relevant operator state correct ? and updates (CRUD) on that list will be
preformed via the control stream. correct ?
BR
Avi

On Wed, Jan 2, 2019 at 4:28 PM Till Rohrmann  wrote:

> Hi Avi,
>
> you could use Flink's broadcast state pattern [1]. You would need to use
> the DataStream API but it allows you to have two streams (input and control
> stream) where the control stream is broadcasted to all sub tasks. So by
> ingesting messages into the control stream you can send model updates to
> all sub tasks.
>
> [1]
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_stream_state_broadcast-5Fstate.html=DwMFaQ=euGZstcaTDllvimEN8b7jXrwqOf-v5A_CdpgnVfiiMM=dpWtkT5FJRWFqDA3MAnB4-dRYGDQjgfQTYAocqGkRKo=u5UQh821Gau2wZ7S3M8IRmVpL5JxGADJaq_k7iq6sYo=uITdFlQPKLbqxkTux4nR21JhUpLIkS5Pdfi9D_ZSUwE=>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_stream_state_broadcast-5Fstate.html=DwQFaQ=euGZstcaTDllvimEN8b7jXrwqOf-v5A_CdpgnVfiiMM=dpWtkT5FJRWFqDA3MAnB4-dRYGDQjgfQTYAocqGkRKo=u5UQh821Gau2wZ7S3M8IRmVpL5JxGADJaq_k7iq6sYo=uITdFlQPKLbqxkTux4nR21JhUpLIkS5Pdfi9D_ZSUwE=>
>
> Cheers,
> Till
>
> On Tue, Jan 1, 2019 at 6:49 PM miki haiat  wrote:
>
>> Im trying to understand  your  use case.
>> What is the source  of the data ? FS ,KAFKA else ?
>>
>>
>> On Tue, Jan 1, 2019 at 6:29 PM Avi Levi  wrote:
>>
>>> Hi,
>>> I have a list (couple of thousands text lines) that I need to use in my
>>> map function. I read this article about broadcasting variables
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_batch_-23broadcast-2Dvariables=DwMFaQ=euGZstcaTDllvimEN8b7jXrwqOf-v5A_CdpgnVfiiMM=dpWtkT5FJRWFqDA3MAnB4-dRYGDQjgfQTYAocqGkRKo=u5UQh821Gau2wZ7S3M8IRmVpL5JxGADJaq_k7iq6sYo=U3vGeHdL9fGDfP0GNZUkGpSlcVLz9CNLg2MXNwHP0_M=>
>>>  or
>>> using distributed cache
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_batch_-23distributed-2Dcache=DwMFaQ=euGZstcaTDllvimEN8b7jXrwqOf-v5A_CdpgnVfiiMM=dpWtkT5FJRWFqDA3MAnB4-dRYGDQjgfQTYAocqGkRKo=u5UQh821Gau2wZ7S3M8IRmVpL5JxGADJaq_k7iq6sYo=m5IHbX1Dbz7AYERvVgyxKXmrUQQ06IkA4VCDllkR0HM=>
>>> however I need to update this list from time to time, and if I understood
>>> correctly it is not possible on broadcast or cache without restarting the
>>> job. Is there idiomatic way to achieve this? A db seems to be an overkill
>>> for that and I do want to be cheap on io/network calls as much as possible.
>>>
>>> Cheers
>>> Avi
>>>
>>>


using updating shared data

2019-01-01 Thread Avi Levi
Hi,
I have a list (couple of thousands text lines) that I need to use in my map
function. I read this article about broadcasting variables

or
using distributed cache

however I need to update this list from time to time, and if I understood
correctly it is not possible on broadcast or cache without restarting the
job. Is there idiomatic way to achieve this? A db seems to be an overkill
for that and I do want to be cheap on io/network calls as much as possible.

Cheers
Avi


Re: getting Timeout expired while fetching topic metadata

2018-12-24 Thread Avi Levi
Thanks Miki,
we had a bug in the certs. however the property you suggested gave as
better logs so it really cleared things. THANKS !!!

On Mon, Dec 24, 2018 at 8:45 PM miki haiat  wrote:

> Hi Avi,
> Can you try to add this properties
>
>  props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
>
> Thanks,
> Miki
>
> On Mon, Dec 24, 2018 at 8:19 PM Avi Levi  wrote:
>
>> Hi all,
>> very new to flink so my apology if it seems trivial.
>> We deployed flink on gcloud
>> I am trying to connect to kafka but keep getting this error:
>> *org.apache.kafka.common.errors.TimeoutException: Timeout expired while
>> fetching topic metadata*
>> this how my properties look like
>> val consumerProperties: Properties = {
>> val p = new Properties()
>> p.setProperty("bootstrap.servers", kafkaBootStrapServers)
>> p.setProperty("group.id", groupId)
>> p.setProperty("client.id",
>> s"queue-consumer-${randomUUID().toString}")
>>
>> p.setProperty("ssl.keystore.location","/usr/path_to/kafka_ssl_client.keystore.jks"))
>> p.setProperty("ssl.keystore.password",  "some password")
>> p.setProperty("ssl.truststore.location",
>> "/usr/path_to/kafka_ssl_client.keystore.jks")
>> p.setProperty("ssl.truststore.password", "some password")
>> p
>>   }
>>
>> please advise
>>
>> Thanks
>> Avi
>>
>


getting Timeout expired while fetching topic metadata

2018-12-24 Thread Avi Levi
Hi all,
very new to flink so my apology if it seems trivial.
We deployed flink on gcloud
I am trying to connect to kafka but keep getting this error:
*org.apache.kafka.common.errors.TimeoutException: Timeout expired while
fetching topic metadata*
this how my properties look like
val consumerProperties: Properties = {
val p = new Properties()
p.setProperty("bootstrap.servers", kafkaBootStrapServers)
p.setProperty("group.id", groupId)
p.setProperty("client.id", s"queue-consumer-${randomUUID().toString}")

p.setProperty("ssl.keystore.location","/usr/path_to/kafka_ssl_client.keystore.jks"))
p.setProperty("ssl.keystore.password",  "some password")
p.setProperty("ssl.truststore.location",
"/usr/path_to/kafka_ssl_client.keystore.jks")
p.setProperty("ssl.truststore.password", "some password")
p
  }

please advise

Thanks
Avi


Re: getting an error when configuring state backend to hdfs

2018-12-24 Thread Avi Levi
Thanks !
My apology for my late response. all good advices
I did put the flink-hadoop-fs jar in /lib as Chesnay suggested.  and from
the IDE simply use file:// as Yun suggested

On Mon, Dec 24, 2018 at 6:32 AM Yun Tang  wrote:

> Hi Avi
>
> For application running in your IDE, please set the checkpoint path schema
> as "file://", you could refer to source code of ITcases using
> rocksDBStateBackend.
>
> For application running in your cluster, please choose Flink with Hadoop
> to download, or choose Flink without hadoop and export your
> HADOOP_CLASSPATH [1]
>
> [1] https://flink.apache.org/downloads.html#latest-stable-release-v171
>
>
> Best
> Yun Tang
> --
> *From:* Avi Levi 
> *Sent:* Thursday, December 20, 2018 2:11
> *To:* Steven Nelson
> *Cc:* Chesnay Schepler; user@flink.apache.org
> *Subject:* Re: getting an error when configuring state backend to hdfs
>
> when I try running from my IDE (intellij) I am getting this exception
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Could not retrieve
> JobResult.
> at
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:643)
> at
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
> at
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
> at
> com.bluevoyant.StreamingJob$.delayedEndpoint$com$bluevoyant$StreamingJob$1(StreamingJob.scala:41)
> at
> com.bluevoyant.StreamingJob$delayedInit$body.apply(StreamingJob.scala:15)
> at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
> at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
> at scala.App$$anonfun$main$1.apply(App.scala:76)
> at scala.App$$anonfun$main$1.apply(App.scala:76)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
> at scala.App$class.main(App.scala:76)
> at com.bluevoyant.StreamingJob$.main(StreamingJob.scala:15)
> at com.bluevoyant.StreamingJob.main(StreamingJob.scala)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit job.
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$submitJob$2(Dispatcher.java:267)
> at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:739)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException:
> org.apache.flink.runtime.client.JobExecutionException: Could not set up
> JobManager
> at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala

Connecting to kafka with tls

2018-12-23 Thread Avi Levi
Hi,
Can anyone give me an example for how to use kafka client with tls support
? I must use tls to connect to our kafka

Thanks in advance
Avi


Re: getting an error when configuring state backend to hdfs

2018-12-19 Thread Avi Levi
aster.java:1146)
at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:296)
at
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157)
... 10 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Could not find a file system implementation for scheme 'hdfs'. The scheme
is not directly supported by Flink and no Hadoop file system to support
this scheme could be loaded.
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:403)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:58)
at
org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444)
at
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:407)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:249)
... 17 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Hadoop is not in the classpath/dependencies.
at
org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
... 23 more


On Wed, Dec 19, 2018 at 6:50 PM Steven Nelson 
wrote:

> What image are you using?
>
> Sent from my iPhone
>
> On Dec 19, 2018, at 9:44 AM, Avi Levi  wrote:
>
> Hi Chesnay,
> What do you mean? I am creating a fat jar with all dependencies (using sbt
> assembly). which jar I should place in the /lib directory ?
>
> On Wed, Dec 19, 2018 at 4:44 PM Chesnay Schepler 
> wrote:
>
>> Are you including the filesystems in your jar? Filesystem jars must be
>> placed in the /lib directory of the flink distribution.
>>
>> On 19.12.2018 15:03, Avi Levi wrote:
>>
>> Hi,
>> I am trying to set the backend state to hdfs
>> *val stateUri = "hdfs/path_to_dir"*
>> *val backend: RocksDBStateBackend = new RocksDBStateBackend(stateUri,
>> true)*
>>
>> *env.setStateBackend(backend) *
>>
>> I am running with flink 1.7.0 with the following dependencies (tried them
>> with different combinations)  :
>> *"org.apache.flink"%% "flink-connector-filesystem" % flinkV*
>> *"org.apache.flink"% "flink-hadoop-fs" % flinkV*
>> *"org.apache.hadoop"   % "hadoop-hdfs" %
>> hadoopVersion*
>>
>>
>> *"org.apache.hadoop"   % "hadoop-common"   %
>> hadoopVersion *
>> *however when running the jar I am getting this error:*
>>
>> *Caused by:
>> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
>> find a file system implementation for scheme 'hdfs'. The scheme is not
>> directly supported by Flink and no Hadoop file system to support this
>> scheme could be loaded.*
>> * at
>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:403)*
>> * at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)*
>> * at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)*
>> * at
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:58)*
>> * at
>> org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444)*
>> * at
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:407)*
>> * at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:249)*
>> * ... 17 more*
>> *Caused by:
>> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is
>> not in the classpath/dependencies.*
>> * at
>> org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)*
>> * at
>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)*
>> * ... 23 more*
>>
>> any help will be greatly appreciated
>>
>>
>>


Re: getting an error when configuring state backend to hdfs

2018-12-19 Thread Avi Levi
Hi Chesnay,
What do you mean? I am creating a fat jar with all dependencies (using sbt
assembly). which jar I should place in the /lib directory ?

On Wed, Dec 19, 2018 at 4:44 PM Chesnay Schepler  wrote:

> Are you including the filesystems in your jar? Filesystem jars must be
> placed in the /lib directory of the flink distribution.
>
> On 19.12.2018 15:03, Avi Levi wrote:
>
> Hi,
> I am trying to set the backend state to hdfs
> *val stateUri = "hdfs/path_to_dir"*
> *val backend: RocksDBStateBackend = new RocksDBStateBackend(stateUri,
> true)*
>
> *env.setStateBackend(backend) *
>
> I am running with flink 1.7.0 with the following dependencies (tried them
> with different combinations)  :
> *"org.apache.flink"%% "flink-connector-filesystem" % flinkV*
> *"org.apache.flink"% "flink-hadoop-fs" % flinkV*
> *"org.apache.hadoop"   % "hadoop-hdfs" %
> hadoopVersion*
>
>
> *"org.apache.hadoop"   % "hadoop-common"   %
> hadoopVersion *
> *however when running the jar I am getting this error:*
>
> *Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Could not find a file system implementation for scheme 'hdfs'. The scheme
> is not directly supported by Flink and no Hadoop file system to support
> this scheme could be loaded.*
> * at
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:403)*
> * at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)*
> * at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)*
> * at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:58)*
> * at
> org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444)*
> * at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:407)*
> * at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:249)*
> * ... 17 more*
> *Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Hadoop is not in the classpath/dependencies.*
> * at
> org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)*
> * at
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)*
> * ... 23 more*
>
> any help will be greatly appreciated
>
>
>


getting an error when configuring state backend to hdfs

2018-12-19 Thread Avi Levi
Hi,
I am trying to set the backend state to hdfs
*val stateUri = "hdfs/path_to_dir"*
*val backend: RocksDBStateBackend = new RocksDBStateBackend(stateUri, true)*

*env.setStateBackend(backend)*

I am running with flink 1.7.0 with the following dependencies (tried them
with different combinations)  :
*"org.apache.flink"%% "flink-connector-filesystem" % flinkV*
*"org.apache.flink"% "flink-hadoop-fs" % flinkV*
*"org.apache.hadoop"   % "hadoop-hdfs" %
hadoopVersion*


*"org.apache.hadoop"   % "hadoop-common"   %
hadoopVersion*
*however when running the jar I am getting this error:*

*Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Could not find a file system implementation for scheme 'hdfs'. The scheme
is not directly supported by Flink and no Hadoop file system to support
this scheme could be loaded.*
* at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:403)*
* at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)*
* at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)*
* at
org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:58)*
* at
org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444)*
* at
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:407)*
* at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:249)*
* ... 17 more*
*Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Hadoop is not in the classpath/dependencies.*
* at
org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)*
* at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)*
* ... 23 more*

any help will be greatly appreciated


Re: Trying to write to parquet file (kafka as a source) yields thousands of "in progress" files

2018-12-10 Thread Avi Levi
Got it , my bad. I should have used backeteer. this seems to be working fine
StreamingFileSink.forBulkFormat[Request](
new Path(outputPath),
ParquetAvroWriters.forReflectRecord(classOf[Request]))
.withBucketAssigner(DateTimeBucketAssigner[Request])
.withBucketCheckInterval(5000L)
.build()

On Sun, Dec 9, 2018 at 2:13 PM Avi Levi  wrote:

> Hi,
> I am trying to read from kafka and write to parquet. But I am getting
> thousands of ".part-0-0in progress..." files (and counting ...)
> is that a bug or am I doing something wrong?
>
> object StreamParquet extends App {
>   implicit val env: StreamExecutionEnvironment =
> StreamExecutionEnvironment.getExecutionEnvironment
>   env.enableCheckpointing(100)
> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>   env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
>   env.getCheckpointConfig.setCheckpointTimeout(600)
>   env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
>   env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>   env.setParallelism(1)
> val consumer = new FlinkKafkaConsumer011[Address](SOURCE_TOPIC, new
> AddressSchema(), consumerProperties)
>   val stream: DataStreamSource[Address] = env.addSource(QueueImpl.consumer)
>   val outputPath = "streaming_files"
>   val sink = StreamingFileSink.forBulkFormat(
> new Path(outputPath),
> ParquetAvroWriters.forReflectRecord(classOf[Address])).build()
>   stream.addSink(sink)
>   env.execute("Write to file")
> }
>
>


Trying to write to parquet file (kafka as a source) yields thousands of "in progress" files

2018-12-09 Thread Avi Levi
Hi,
I am trying to read from kafka and write to parquet. But I am getting
thousands of ".part-0-0in progress..." files (and counting ...)
is that a bug or am I doing something wrong?

object StreamParquet extends App {
  implicit val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
  env.enableCheckpointing(100)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
  env.getCheckpointConfig.setCheckpointTimeout(600)
  env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
  env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  env.setParallelism(1)
val consumer = new FlinkKafkaConsumer011[Address](SOURCE_TOPIC, new
AddressSchema(), consumerProperties)
  val stream: DataStreamSource[Address] = env.addSource(QueueImpl.consumer)
  val outputPath = "streaming_files"
  val sink = StreamingFileSink.forBulkFormat(
new Path(outputPath),
ParquetAvroWriters.forReflectRecord(classOf[Address])).build()
  stream.addSink(sink)
  env.execute("Write to file")
}


Re: Stream in loop and not getting to sink (Parquet writer )

2018-12-03 Thread Avi Levi
Thanks Kostas,
Ok got it, so bucketingSink might not be a good choice here. can you please
advice what will be the best approach ? I have heavy load of data that I
consume from kafka that I want to process and put them in a file (doesn't
have to be parquet) . I thought that StreamingFileSink might be a good
choice but I guess I am doing something wrong there . if there is a good
example for that - it will be great .

BR
Avi

On Mon, Dec 3, 2018 at 4:11 PM Kostas Kloudas 
wrote:

> Hi Avi,
>
> For Bulk Formats like Parquet, unfortunately, we do not support setting
> the batch size.
> The part-files roll on every checkpoint. This is a known limitation and
> there are plans to
> alleviate it in the future.
>
> Setting the batch size (among other things) is supported for RowWise
> formats.
>
> Cheers,
> Kostas
>
> On Sun, Dec 2, 2018 at 9:29 PM Avi Levi  wrote:
>
>> Thanks Kostas. I will definitely look into that. but is the
>> StreamingFileSink also support setting the batch size by size and/or by
>> time interval like bucketing sink ?
>>
>> On Sun, Dec 2, 2018 at 5:09 PM Kostas Kloudas <
>> k.klou...@data-artisans.com> wrote:
>>
>>> Hi Avi,
>>>
>>> The ParquetAvroWriters cannot be used with the BucketingSink.
>>>
>>> In fact the StreamingFIleSink is the "evolution" of the BucketingSink
>>> and it supports
>>> all the functionality that the BucketingSink supports.
>>>
>>> Given this, why not using the StreamingFileSink?
>>>
>>> On Sat, Dec 1, 2018 at 7:56 AM Avi Levi  wrote:
>>>
>>>> Thanks looks good.
>>>> Do you know a way to use PaquetWriter or ParquetAvroWriters with a 
>>>> BucketingSink
>>>> file
>>>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/filesystem_sink.html#bucketing-file-sink>
>>>> ? something like :
>>>>
>>>> val bucketingSink = new BucketingSink[String]("/base/path")
>>>> bucketingSink.setBucketer(new DateTimeBucketer[String]("-MM-dd--HHmm"))
>>>> bucketingSink.setWriter(ParquetAvroWriters.forGenericRecord(schema))
>>>> bucketingSink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
>>>> bucketingSink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins
>>>>
>>>>
>>>> On Fri, Nov 30, 2018 at 3:59 PM Kostas Kloudas <
>>>> k.klou...@data-artisans.com> wrote:
>>>>
>>>>> And for a Java example which is actually similar to your pipeline,
>>>>> you can check the ParquetStreamingFileSinkITCase.
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Nov 30, 2018 at 2:39 PM Kostas Kloudas <
>>>>> k.klou...@data-artisans.com> wrote:
>>>>>
>>>>>> Hi Avi,
>>>>>>
>>>>>> At a first glance I am not seeing anything wrong with your code.
>>>>>> Did you verify that there are elements flowing in your pipeline and
>>>>>> that checkpoints are actually completed?
>>>>>> And also can you check the logs at Job and Task Manager for anything
>>>>>> suspicious?
>>>>>>
>>>>>> Unfortunately, we do not allow specifying encoding and other
>>>>>> parameters to your writer, which is an omission
>>>>>> on our part and this should be fixed. Could you open a JIRA for that?
>>>>>>
>>>>>> If you want to know more about Flink's Parquet-Avro writer, feel free
>>>>>> to have a look at the ParquetAvroWriters
>>>>>> class.
>>>>>>
>>>>>> Cheers,
>>>>>> Kostas
>>>>>>
>>>>>>
>>>>>> On Thu, Nov 29, 2018 at 6:58 PM Avi Levi 
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks a lot Kostas, but the file not created . what am I doing
>>>>>>> wrong?
>>>>>>> BTW how can you set the encoding etc' in Flink's Avro - Parquet
>>>>>>> writer?
>>>>>>>
>>>>>>> object Tester extends App {
>>>>>>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>   def now = System.currentTimeMillis()
>>>>>>>   val path = new Path(s"test-$now.parquet")
>>>>>>>   val schema: Schema = new Schema.Parser().parse(schemaString)
>>>>>>>   

Looking for example for bucketingSink / StreamingFileSink

2018-12-03 Thread Avi Levi
Hi Guys,
very new to flink so my apology for the newbie questions :)
but I desperately looking for a good example for streaming to file
using bucketingSink / StreamingFileSink . Unfortunately the examples in the
documentation are not event compiling (at least not the ones in scala
https://issues.apache.org/jira/browse/FLINK-11053 )

I tried using bucketing sink with streamingFileSink (or just
streamingFileSink ) and finally tried to implement a writer but with no
luck.
BucketingSink seems to be a perfect fit because I can set the batch
interval by time interval or size which is exactly what I need.

This is my last attempt (sample project)
 which
results a lot of "pending" files.
*Any help would be appreciated*

*Thanks*
*Avi*


Re: Stream in loop and not getting to sink (Parquet writer )

2018-12-02 Thread Avi Levi
Thanks Kostas. I will definitely look into that. but is the
StreamingFileSink also support setting the batch size by size and/or by
time interval like bucketing sink ?

On Sun, Dec 2, 2018 at 5:09 PM Kostas Kloudas 
wrote:

> Hi Avi,
>
> The ParquetAvroWriters cannot be used with the BucketingSink.
>
> In fact the StreamingFIleSink is the "evolution" of the BucketingSink and
> it supports
> all the functionality that the BucketingSink supports.
>
> Given this, why not using the StreamingFileSink?
>
> On Sat, Dec 1, 2018 at 7:56 AM Avi Levi  wrote:
>
>> Thanks looks good.
>> Do you know a way to use PaquetWriter or ParquetAvroWriters with a 
>> BucketingSink
>> file
>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/filesystem_sink.html#bucketing-file-sink>
>> ? something like :
>>
>> val bucketingSink = new BucketingSink[String]("/base/path")
>> bucketingSink.setBucketer(new DateTimeBucketer[String]("-MM-dd--HHmm"))
>> bucketingSink.setWriter(ParquetAvroWriters.forGenericRecord(schema))
>> bucketingSink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
>> bucketingSink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins
>>
>>
>> On Fri, Nov 30, 2018 at 3:59 PM Kostas Kloudas <
>> k.klou...@data-artisans.com> wrote:
>>
>>> And for a Java example which is actually similar to your pipeline,
>>> you can check the ParquetStreamingFileSinkITCase.
>>>
>>>
>>>
>>> On Fri, Nov 30, 2018 at 2:39 PM Kostas Kloudas <
>>> k.klou...@data-artisans.com> wrote:
>>>
>>>> Hi Avi,
>>>>
>>>> At a first glance I am not seeing anything wrong with your code.
>>>> Did you verify that there are elements flowing in your pipeline and
>>>> that checkpoints are actually completed?
>>>> And also can you check the logs at Job and Task Manager for anything
>>>> suspicious?
>>>>
>>>> Unfortunately, we do not allow specifying encoding and other parameters
>>>> to your writer, which is an omission
>>>> on our part and this should be fixed. Could you open a JIRA for that?
>>>>
>>>> If you want to know more about Flink's Parquet-Avro writer, feel free
>>>> to have a look at the ParquetAvroWriters
>>>> class.
>>>>
>>>> Cheers,
>>>> Kostas
>>>>
>>>>
>>>> On Thu, Nov 29, 2018 at 6:58 PM Avi Levi 
>>>> wrote:
>>>>
>>>>> Thanks a lot Kostas, but the file not created . what am I doing wrong?
>>>>> BTW how can you set the encoding etc' in Flink's Avro - Parquet writer?
>>>>>
>>>>> object Tester extends App {
>>>>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>   def now = System.currentTimeMillis()
>>>>>   val path = new Path(s"test-$now.parquet")
>>>>>   val schema: Schema = new Schema.Parser().parse(schemaString)
>>>>>   val streamingSink = StreamingFileSink.forBulkFormat( path,
>>>>>   ParquetAvroWriters.forGenericRecord(schema))
>>>>>   .build()
>>>>>   env.enableCheckpointing(100)
>>>>>   val stream = env.addSource(FirstSeenQueueImpl.consumer).map{ r =>
>>>>> val genericReocrd: GenericRecord = new GenericData.Record(schema)
>>>>> genericReocrd.put("name", r.name)
>>>>> genericReocrd.put("code", r.code.asString)
>>>>> genericReocrd.put("ts", r.ts)
>>>>> genericReocrd
>>>>>   }
>>>>> stream.addSink { r =>
>>>>> println(s"In Sink $r") //getting this line
>>>>> streamingSink
>>>>> }
>>>>>   env.execute()
>>>>> }
>>>>>
>>>>> Cheers
>>>>> Avi
>>>>>
>>>>> On Thu, Nov 29, 2018 at 1:36 PM Kostas Kloudas <
>>>>> k.klou...@data-artisans.com> wrote:
>>>>>
>>>>>>
>>>>>> Sorry, previously I got confused and I assumed you were using Flink's
>>>>>> StreamingFileSink.
>>>>>>
>>>>>> Could you try to use Flink's Avro - Parquet writer?
>>>>>>
>>>>>> StreamingFileSink.forBulkFormat(
>>>>>>   Path...(MY_PATH),
>>>>>>   ParquetAvroWrite

Re: Stream in loop and not getting to sink (Parquet writer )

2018-11-30 Thread Avi Levi
Thanks looks good.
Do you know a way to use PaquetWriter or ParquetAvroWriters with a
BucketingSink
file
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/filesystem_sink.html#bucketing-file-sink>
? something like :

val bucketingSink = new BucketingSink[String]("/base/path")
bucketingSink.setBucketer(new DateTimeBucketer[String]("-MM-dd--HHmm"))
bucketingSink.setWriter(ParquetAvroWriters.forGenericRecord(schema))
bucketingSink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
bucketingSink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins


On Fri, Nov 30, 2018 at 3:59 PM Kostas Kloudas 
wrote:

> And for a Java example which is actually similar to your pipeline,
> you can check the ParquetStreamingFileSinkITCase.
>
>
>
> On Fri, Nov 30, 2018 at 2:39 PM Kostas Kloudas <
> k.klou...@data-artisans.com> wrote:
>
>> Hi Avi,
>>
>> At a first glance I am not seeing anything wrong with your code.
>> Did you verify that there are elements flowing in your pipeline and that
>> checkpoints are actually completed?
>> And also can you check the logs at Job and Task Manager for anything
>> suspicious?
>>
>> Unfortunately, we do not allow specifying encoding and other parameters
>> to your writer, which is an omission
>> on our part and this should be fixed. Could you open a JIRA for that?
>>
>> If you want to know more about Flink's Parquet-Avro writer, feel free to
>> have a look at the ParquetAvroWriters
>> class.
>>
>> Cheers,
>> Kostas
>>
>>
>> On Thu, Nov 29, 2018 at 6:58 PM Avi Levi  wrote:
>>
>>> Thanks a lot Kostas, but the file not created . what am I doing wrong?
>>> BTW how can you set the encoding etc' in Flink's Avro - Parquet writer?
>>>
>>> object Tester extends App {
>>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>   def now = System.currentTimeMillis()
>>>   val path = new Path(s"test-$now.parquet")
>>>   val schema: Schema = new Schema.Parser().parse(schemaString)
>>>   val streamingSink = StreamingFileSink.forBulkFormat( path,
>>>   ParquetAvroWriters.forGenericRecord(schema))
>>>   .build()
>>>   env.enableCheckpointing(100)
>>>   val stream = env.addSource(FirstSeenQueueImpl.consumer).map{ r =>
>>> val genericReocrd: GenericRecord = new GenericData.Record(schema)
>>> genericReocrd.put("name", r.name)
>>> genericReocrd.put("code", r.code.asString)
>>> genericReocrd.put("ts", r.ts)
>>> genericReocrd
>>>   }
>>> stream.addSink { r =>
>>> println(s"In Sink $r") //getting this line
>>> streamingSink
>>> }
>>>   env.execute()
>>> }
>>>
>>> Cheers
>>> Avi
>>>
>>> On Thu, Nov 29, 2018 at 1:36 PM Kostas Kloudas <
>>> k.klou...@data-artisans.com> wrote:
>>>
>>>>
>>>> Sorry, previously I got confused and I assumed you were using Flink's
>>>> StreamingFileSink.
>>>>
>>>> Could you try to use Flink's Avro - Parquet writer?
>>>>
>>>> StreamingFileSink.forBulkFormat(
>>>>   Path...(MY_PATH),
>>>>   ParquetAvroWriters.forGenericRecord(MY_SCHEMA))
>>>> .build()
>>>>
>>>>
>>>> Cheers,
>>>> Kostas
>>>>
>>>> On Thu, Nov 29, 2018 at 12:25 PM Avi Levi 
>>>> wrote:
>>>>
>>>>> Thanks.
>>>>> yes, the *env.execute* is called and enabled checkpoints
>>>>> I think the problem is where to place the *writer.close *to flush the
>>>>> cache
>>>>> If I'll place on the sink after the write event e.g
>>>>> addSink{
>>>>> writer.write
>>>>> writer.close
>>>>> }
>>>>> in this case only the first record will be included in the file but
>>>>> not the rest of the stream.
>>>>>
>>>>>
>>>>> On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas <
>>>>> k.klou...@data-artisans.com> wrote:
>>>>>
>>>>>> Hi again Avi,
>>>>>>
>>>>>> In the first example that you posted (the one with the Kafka source),
>>>>>> do you call env.execute()?
>>>>>>
>>>>>> Cheers,
>>>>>> Kostas
>>>>>>
>&g

Re: Stream in loop and not getting to sink (Parquet writer )

2018-11-29 Thread Avi Levi
Thanks a lot Kostas, but the file not created . what am I doing wrong?
BTW how can you set the encoding etc' in Flink's Avro - Parquet writer?

object Tester extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  def now = System.currentTimeMillis()
  val path = new Path(s"test-$now.parquet")
  val schema: Schema = new Schema.Parser().parse(schemaString)
  val streamingSink = StreamingFileSink.forBulkFormat( path,
  ParquetAvroWriters.forGenericRecord(schema))
  .build()
  env.enableCheckpointing(100)
  val stream = env.addSource(FirstSeenQueueImpl.consumer).map{ r =>
val genericReocrd: GenericRecord = new GenericData.Record(schema)
genericReocrd.put("name", r.name)
genericReocrd.put("code", r.code.asString)
genericReocrd.put("ts", r.ts)
genericReocrd
  }
stream.addSink { r =>
println(s"In Sink $r") //getting this line
streamingSink
}
  env.execute()
}

Cheers
Avi

On Thu, Nov 29, 2018 at 1:36 PM Kostas Kloudas 
wrote:

>
> Sorry, previously I got confused and I assumed you were using Flink's
> StreamingFileSink.
>
> Could you try to use Flink's Avro - Parquet writer?
>
> StreamingFileSink.forBulkFormat(
>   Path...(MY_PATH),
>   ParquetAvroWriters.forGenericRecord(MY_SCHEMA))
> .build()
>
>
> Cheers,
> Kostas
>
> On Thu, Nov 29, 2018 at 12:25 PM Avi Levi  wrote:
>
>> Thanks.
>> yes, the *env.execute* is called and enabled checkpoints
>> I think the problem is where to place the *writer.close *to flush the
>> cache
>> If I'll place on the sink after the write event e.g
>> addSink{
>> writer.write
>> writer.close
>> }
>> in this case only the first record will be included in the file but not
>> the rest of the stream.
>>
>>
>> On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas <
>> k.klou...@data-artisans.com> wrote:
>>
>>> Hi again Avi,
>>>
>>> In the first example that you posted (the one with the Kafka source), do
>>> you call env.execute()?
>>>
>>> Cheers,
>>> Kostas
>>>
>>> On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas <
>>> k.klou...@data-artisans.com> wrote:
>>>
>>>> Hi Avi,
>>>>
>>>> In the last snippet that you posted, you have not activated checkpoints.
>>>>
>>>> Checkpoints are needed for the StreamingFileSink to produce results,
>>>> especially in the case of BulkWriters (like Parquet) where
>>>> the part file is rolled upon reception of a checkpoint and the part is
>>>> finalised (i.e. "committed") when the checkpoint gets completed
>>>> successfully.
>>>>
>>>> Could you please enable checkpointing and make sure that the job runs
>>>> long enough for at least some checkpoints to be completed?
>>>>
>>>> Thanks a lot,
>>>> Kostas
>>>>
>>>> On Thu, Nov 29, 2018 at 7:03 AM Avi Levi 
>>>> wrote:
>>>>
>>>>> Checkout this little App. you can see that the file is created but no
>>>>> data is written. even for a single record
>>>>>
>>>>> import io.eels.component.parquet.ParquetWriterConfig
>>>>> import org.apache.avro.Schema
>>>>> import org.apache.avro.generic.{ GenericData, GenericRecord }
>>>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>>>> import org.apache.hadoop.fs.Path
>>>>> import org.apache.parquet.avro.AvroParquetWriter
>>>>> import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter }
>>>>> import org.apache.parquet.hadoop.metadata.CompressionCodecName
>>>>> import scala.io.Source
>>>>> import org.apache.flink.streaming.api.scala._
>>>>>
>>>>> object Tester extends App {
>>>>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>   def now = System.currentTimeMillis()
>>>>>   val path = new Path(s"test-$now.parquet")
>>>>>   val schemaString = 
>>>>> Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
>>>>>   val schema: Schema = new Schema.Parser().parse(schemaString)
>>>>>   val compressionCodecName = CompressionCodecName.SNAPPY
>>>>>   val config = ParquetWriterConfig()
>>>>>   val genericReocrd: GenericRecord = new GenericData.Record(schema)
>>>>>   genericReocrd.put("name", "test_b")
>&

Re: Stream in loop and not getting to sink (Parquet writer )

2018-11-29 Thread Avi Levi
Thanks.
yes, the *env.execute* is called and enabled checkpoints
I think the problem is where to place the *writer.close *to flush the cache
If I'll place on the sink after the write event e.g
addSink{
writer.write
writer.close
}
in this case only the first record will be included in the file but not the
rest of the stream.


On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas 
wrote:

> Hi again Avi,
>
> In the first example that you posted (the one with the Kafka source), do
> you call env.execute()?
>
> Cheers,
> Kostas
>
> On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas <
> k.klou...@data-artisans.com> wrote:
>
>> Hi Avi,
>>
>> In the last snippet that you posted, you have not activated checkpoints.
>>
>> Checkpoints are needed for the StreamingFileSink to produce results,
>> especially in the case of BulkWriters (like Parquet) where
>> the part file is rolled upon reception of a checkpoint and the part is
>> finalised (i.e. "committed") when the checkpoint gets completed
>> successfully.
>>
>> Could you please enable checkpointing and make sure that the job runs
>> long enough for at least some checkpoints to be completed?
>>
>> Thanks a lot,
>> Kostas
>>
>> On Thu, Nov 29, 2018 at 7:03 AM Avi Levi  wrote:
>>
>>> Checkout this little App. you can see that the file is created but no
>>> data is written. even for a single record
>>>
>>> import io.eels.component.parquet.ParquetWriterConfig
>>> import org.apache.avro.Schema
>>> import org.apache.avro.generic.{ GenericData, GenericRecord }
>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>> import org.apache.hadoop.fs.Path
>>> import org.apache.parquet.avro.AvroParquetWriter
>>> import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter }
>>> import org.apache.parquet.hadoop.metadata.CompressionCodecName
>>> import scala.io.Source
>>> import org.apache.flink.streaming.api.scala._
>>>
>>> object Tester extends App {
>>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>   def now = System.currentTimeMillis()
>>>   val path = new Path(s"test-$now.parquet")
>>>   val schemaString = 
>>> Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
>>>   val schema: Schema = new Schema.Parser().parse(schemaString)
>>>   val compressionCodecName = CompressionCodecName.SNAPPY
>>>   val config = ParquetWriterConfig()
>>>   val genericReocrd: GenericRecord = new GenericData.Record(schema)
>>>   genericReocrd.put("name", "test_b")
>>>   genericReocrd.put("code", "NoError")
>>>   genericReocrd.put("ts", 100L)
>>>   val stream = env.fromElements(genericReocrd)
>>>   val writer: ParquetWriter[GenericRecord] = 
>>> AvroParquetWriter.builder[GenericRecord](path)
>>> .withSchema(schema)
>>> .withCompressionCodec(compressionCodecName)
>>> .withPageSize(config.pageSize)
>>> .withRowGroupSize(config.blockSize)
>>> .withDictionaryEncoding(config.enableDictionary)
>>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>> .withValidation(config.validating)
>>> .build()
>>>
>>>   writer.write(genericReocrd)
>>>   stream.addSink { r =>
>>> println(s"In Sink $r")
>>> writer.write(r)
>>>   }
>>>   env.execute()
>>>   //  writer.close()
>>> }
>>>
>>>
>>> On Thu, Nov 29, 2018 at 6:57 AM vipul singh  wrote:
>>>
>>>> Can you try closing the writer?
>>>>
>>>> AvroParquetWriter has an internal buffer. Try doing a .close() in
>>>> snapshot()( since you are checkpointing hence this method will be called)
>>>>
>>>> On Wed, Nov 28, 2018 at 7:33 PM Avi Levi 
>>>> wrote:
>>>>
>>>>> Thanks Rafi,
>>>>> I am actually not using assignTimestampsAndWatermarks , I will try to
>>>>> add it as you suggested. however it seems that the messages I repeating in
>>>>> the stream over and over even if I am pushing single message manually to
>>>>> the queue, that message will repeat infinity
>>>>>
>>>>> Cheers
>>>>> Avi
>>>>>
>>>>>
>>>>> On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch 
>>>>> wrote:
>>>>>
>>>>>> Hi Avi,
>>&

Re: Stream in loop and not getting to sink (Parquet writer )

2018-11-28 Thread Avi Levi
Checkout this little App. you can see that the file is created but no data
is written. even for a single record

import io.eels.component.parquet.ParquetWriterConfig
import org.apache.avro.Schema
import org.apache.avro.generic.{ GenericData, GenericRecord }
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter }
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import scala.io.Source
import org.apache.flink.streaming.api.scala._

object Tester extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  def now = System.currentTimeMillis()
  val path = new Path(s"test-$now.parquet")
  val schemaString =
Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
  val schema: Schema = new Schema.Parser().parse(schemaString)
  val compressionCodecName = CompressionCodecName.SNAPPY
  val config = ParquetWriterConfig()
  val genericReocrd: GenericRecord = new GenericData.Record(schema)
  genericReocrd.put("name", "test_b")
  genericReocrd.put("code", "NoError")
  genericReocrd.put("ts", 100L)
  val stream = env.fromElements(genericReocrd)
  val writer: ParquetWriter[GenericRecord] =
AvroParquetWriter.builder[GenericRecord](path)
.withSchema(schema)
.withCompressionCodec(compressionCodecName)
.withPageSize(config.pageSize)
.withRowGroupSize(config.blockSize)
.withDictionaryEncoding(config.enableDictionary)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withValidation(config.validating)
.build()

  writer.write(genericReocrd)
  stream.addSink { r =>
println(s"In Sink $r")
writer.write(r)
  }
  env.execute()
  //  writer.close()
}


On Thu, Nov 29, 2018 at 6:57 AM vipul singh  wrote:

> Can you try closing the writer?
>
> AvroParquetWriter has an internal buffer. Try doing a .close() in
> snapshot()( since you are checkpointing hence this method will be called)
>
> On Wed, Nov 28, 2018 at 7:33 PM Avi Levi  wrote:
>
>> Thanks Rafi,
>> I am actually not using assignTimestampsAndWatermarks , I will try to
>> add it as you suggested. however it seems that the messages I repeating in
>> the stream over and over even if I am pushing single message manually to
>> the queue, that message will repeat infinity
>>
>> Cheers
>> Avi
>>
>>
>> On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch  wrote:
>>
>>> Hi Avi,
>>>
>>> I can't see the part where you use  assignTimestampsAndWatermarks.
>>> If this part in not set properly, it's possible that watermarks are not
>>> sent and nothing will be written to your Sink.
>>>
>>> See here for more details:
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
>>>
>>> Hope this helps,
>>> Rafi
>>>
>>> On Wed, Nov 28, 2018, 21:22 Avi Levi >>
>>>> Hi,
>>>>
>>>> I am trying to implement Parquet Writer as SinkFunction. The pipeline
>>>> consists of kafka as source and parquet file as a sink however it seems
>>>> like the stream is repeating itself like endless loop and the parquet file
>>>> is not written . can someone please help me with this?
>>>>
>>>> object ParquetSinkWriter{
>>>>   private val path = new Path("tmp/pfile")
>>>>   private val schemaString =
>>>> Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
>>>>   private val avroSchema: Schema = new
>>>> Schema.Parser().parse(schemaString)
>>>>   private val compressionCodecName = CompressionCodecName.SNAPPY
>>>>   private   val config = ParquetWriterConfig()
>>>>   val writer: ParquetWriter[GenericRecord] =
>>>> AvroParquetWriter.builder[GenericRecord](path)
>>>> .withSchema(avroSchema)
>>>> .withCompressionCodec(compressionCodecName)
>>>> .withPageSize(config.pageSize)
>>>> .withRowGroupSize(config.blockSize)
>>>> .withDictionaryEncoding(config.enableDictionary)
>>>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>>> .withValidation(config.validating)
>>>> .build()
>>>> }
>>>>
>>>> class ParquetSinkWriter(path: Path, avroSchema: Schema) extends
>>>> SinkFunction[GenericRecord] {
>>>>   import ParquetSinkWriter._
>>>>   override def invoke(value: GenericRecord): Uni

Re: Stream in loop and not getting to sink (Parquet writer )

2018-11-28 Thread Avi Levi
Thanks Rafi,
I am actually not using assignTimestampsAndWatermarks , I will try to add
it as you suggested. however it seems that the messages I repeating in the
stream over and over even if I am pushing single message manually to the
queue, that message will repeat infinity

Cheers
Avi


On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch  wrote:

> Hi Avi,
>
> I can't see the part where you use  assignTimestampsAndWatermarks.
> If this part in not set properly, it's possible that watermarks are not
> sent and nothing will be written to your Sink.
>
> See here for more details:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
>
> Hope this helps,
> Rafi
>
> On Wed, Nov 28, 2018, 21:22 Avi Levi 
>> Hi,
>>
>> I am trying to implement Parquet Writer as SinkFunction. The pipeline
>> consists of kafka as source and parquet file as a sink however it seems
>> like the stream is repeating itself like endless loop and the parquet file
>> is not written . can someone please help me with this?
>>
>> object ParquetSinkWriter{
>>   private val path = new Path("tmp/pfile")
>>   private val schemaString =
>> Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
>>   private val avroSchema: Schema = new Schema.Parser().parse(schemaString)
>>   private val compressionCodecName = CompressionCodecName.SNAPPY
>>   private   val config = ParquetWriterConfig()
>>   val writer: ParquetWriter[GenericRecord] =
>> AvroParquetWriter.builder[GenericRecord](path)
>> .withSchema(avroSchema)
>> .withCompressionCodec(compressionCodecName)
>> .withPageSize(config.pageSize)
>> .withRowGroupSize(config.blockSize)
>> .withDictionaryEncoding(config.enableDictionary)
>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>> .withValidation(config.validating)
>> .build()
>> }
>>
>> class ParquetSinkWriter(path: Path, avroSchema: Schema) extends
>> SinkFunction[GenericRecord] {
>>   import ParquetSinkWriter._
>>   override def invoke(value: GenericRecord): Unit = {
>> println(s"ADDING TO File : $value") // getting this output
>> writer.write(value) //the output is not written to the file
>>   }
>> }
>>
>> //main app
>> object StreamingJob extends App  {
>>  implicit val env: StreamExecutionEnvironment =
>> StreamExecutionEnvironment.getExecutionEnvironment
>>   env.enableCheckpointing(500)
>>
>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>>
>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
>>   env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
>>   env.getCheckpointConfig.setCheckpointTimeout(600)
>>   env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
>>   env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>   env.setRestartStrategy(RestartStrategies.failureRateRestart(2,
>> Time.seconds(3), Time.seconds(3)))
>>   val backend: StateBackend = new
>> RocksDBStateBackend("file:///tmp/rocksdb", true)
>>   env.setStateBackend(backend)
>>   val writer = new ParquetSinkWriter(outputPath, schema)
>>   *val stream2: DataStream[DnsRequest] = env.addSource(//consume from
>> kafka)*
>> *stream2.map { r =>*
>> *println(s"MAPPING $r") //this output keeps repeating in a loop*
>> *val genericReocrd: GenericRecord = new GenericData.Record(schema)*
>> *genericReocrd.put("qname", r.qname)*
>> *genericReocrd.put("rcode", r.rcode)*
>> *genericReocrd.put("ts", r.ts)*
>> *genericReocrd*
>> *  }.addSink(writer) *
>>
>> Thanks for your help
>> Avi
>>
>>


Stream in loop and not getting to sink (Parquet writer )

2018-11-28 Thread Avi Levi
Hi,

I am trying to implement Parquet Writer as SinkFunction. The pipeline
consists of kafka as source and parquet file as a sink however it seems
like the stream is repeating itself like endless loop and the parquet file
is not written . can someone please help me with this?

object ParquetSinkWriter{
  private val path = new Path("tmp/pfile")
  private val schemaString =
Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
  private val avroSchema: Schema = new Schema.Parser().parse(schemaString)
  private val compressionCodecName = CompressionCodecName.SNAPPY
  private   val config = ParquetWriterConfig()
  val writer: ParquetWriter[GenericRecord] =
AvroParquetWriter.builder[GenericRecord](path)
.withSchema(avroSchema)
.withCompressionCodec(compressionCodecName)
.withPageSize(config.pageSize)
.withRowGroupSize(config.blockSize)
.withDictionaryEncoding(config.enableDictionary)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withValidation(config.validating)
.build()
}

class ParquetSinkWriter(path: Path, avroSchema: Schema) extends
SinkFunction[GenericRecord] {
  import ParquetSinkWriter._
  override def invoke(value: GenericRecord): Unit = {
println(s"ADDING TO File : $value") // getting this output
writer.write(value) //the output is not written to the file
  }
}

//main app
object StreamingJob extends App  {
 implicit val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
  env.enableCheckpointing(500)

env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
  env.getCheckpointConfig.setCheckpointTimeout(600)
  env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
  env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  env.setRestartStrategy(RestartStrategies.failureRateRestart(2,
Time.seconds(3), Time.seconds(3)))
  val backend: StateBackend = new
RocksDBStateBackend("file:///tmp/rocksdb", true)
  env.setStateBackend(backend)
  val writer = new ParquetSinkWriter(outputPath, schema)
  *val stream2: DataStream[DnsRequest] = env.addSource(//consume from
kafka)*
*stream2.map { r =>*
*println(s"MAPPING $r") //this output keeps repeating in a loop*
*val genericReocrd: GenericRecord = new GenericData.Record(schema)*
*genericReocrd.put("qname", r.qname)*
*genericReocrd.put("rcode", r.rcode)*
*genericReocrd.put("ts", r.ts)*
*genericReocrd*
*  }.addSink(writer) *

Thanks for your help
Avi


Re: your advice please regarding state

2018-11-27 Thread Avi Levi
Thank you very much. got it.

On Tue, Nov 27, 2018 at 12:53 PM Fabian Hueske  wrote:

> Hi Avi,
>
> I'd definitely go for approach #1.
> Flink will hash partition the records across all nodes. This is basically
> the same as a distributed key-value store sharding keys.
> I would not try to fine tune the partitioning. You should try to use as
> many keys as possible to ensure an even distribution of key. This will also
> allow to scale your application later to more tasks.
>
> Best, Fabian
>
> Am Di., 27. Nov. 2018 um 05:21 Uhr schrieb yinhua.dai <
> yinhua.2...@outlook.com>:
>
>> General approach#1 is ok, but you may have to use some hash based key
>> selector if you have a heavy data skew.
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: understadning kafka connector - rebalance

2018-11-26 Thread Avi Levi
Thanks, that makes sense !

On Mon, Nov 26, 2018 at 1:06 PM Fabian Hueske  wrote:

> Hi,
>
> DataStream x = ...
> x.rebalance().keyBy()
>
> is not a good idea.
>
> It will first distribute the records round-robin (over the network) and
> subsequently partition them by hash.
> The first shuffle is unnecessary. It does not have any effect because it
> is undone by the second partitioning.
>
> Btw. any methods on DataStream do not have any effect on Kafka topcis or
> partitions.
> In the initially quoted example, we assume that the events of the original
> DataStream are not evenly distributed among the parallel tasks. The
> rebalance() call generates an even distribution which is especially
> important if the map() operation is heavy-weight / compute intensive.
>
> Best, Fabian
>
>
>
>
>
> Am Mo., 26. Nov. 2018 um 10:59 Uhr schrieb Taher Koitawala <
> taher.koitaw...@gslab.com>:
>
>> You can use rebalance before keyBy because rebalance returns DataStream.
>> The API does not allow rebalance on keyedStreamed which is returned after
>> keyBy so you are safe.
>>
>> On Mon 26 Nov, 2018, 2:25 PM Avi Levi >
>>> Ok, thanks for the clarification. but if I use it with keyed state so
>>> the partition is by the key. rebalancing will not shuffle this partitioning
>>> ? e.g
>>> .addSource(source)
>>>   .rebalance
>>>   .keyBy(_.id)
>>>   .mapWithState(...)
>>>
>>>
>>> On Mon, Nov 26, 2018 at 8:32 AM Taher Koitawala <
>>> taher.koitaw...@gslab.com> wrote:
>>>
>>>> Hi Avi,
>>>>   No, rebalance is not changing the number of kafka partitions.
>>>> Lets say you have 6 kafka partitions and your flink parallelism is 8, in
>>>> this case using rebalance will send records to all downstream operators in
>>>> a round robin fashion.
>>>>
>>>> Regards,
>>>> Taher Koitawala
>>>> GS Lab Pune
>>>> +91 8407979163
>>>>
>>>>
>>>> On Mon, Nov 26, 2018 at 11:33 AM Avi Levi 
>>>> wrote:
>>>>
>>>>> Hi
>>>>> Looking at this example
>>>>> <https://github.com/dataArtisans/kafka-example/blob/master/src/main/java/com/dataartisans/ReadFromKafka.java>,
>>>>> doing the "rebalance" (e.g messageStream.rebalance().map(...) )
>>>>> operation on heavy load stream wouldn't slow the stream ? is the
>>>>> rebalancing action occurs only when there is a partition change ?
>>>>> it says that "the rebelance call is causing a repartitioning of the
>>>>> data so that all machines" is it actually changing the num of
>>>>> partitions of the topic to match the num of flink operators ?
>>>>>
>>>>> Avi
>>>>>
>>>>


Re: understadning kafka connector - rebalance

2018-11-26 Thread Avi Levi
Ok, thanks for the clarification. but if I use it with keyed state so the
partition is by the key. rebalancing will not shuffle this partitioning ?
e.g
.addSource(source)
  .rebalance
  .keyBy(_.id)
  .mapWithState(...)


On Mon, Nov 26, 2018 at 8:32 AM Taher Koitawala 
wrote:

> Hi Avi,
>   No, rebalance is not changing the number of kafka partitions.
> Lets say you have 6 kafka partitions and your flink parallelism is 8, in
> this case using rebalance will send records to all downstream operators in
> a round robin fashion.
>
> Regards,
> Taher Koitawala
> GS Lab Pune
> +91 8407979163
>
>
> On Mon, Nov 26, 2018 at 11:33 AM Avi Levi  wrote:
>
>> Hi
>> Looking at this example
>> <https://github.com/dataArtisans/kafka-example/blob/master/src/main/java/com/dataartisans/ReadFromKafka.java>,
>> doing the "rebalance" (e.g messageStream.rebalance().map(...) )
>> operation on heavy load stream wouldn't slow the stream ? is the
>> rebalancing action occurs only when there is a partition change ?
>> it says that "the rebelance call is causing a repartitioning of the data
>> so that all machines" is it actually changing the num of partitions of
>> the topic to match the num of flink operators ?
>>
>> Avi
>>
>


understadning kafka connector - rebalance

2018-11-25 Thread Avi Levi
Hi
Looking at this example
,
doing the "rebalance" (e.g messageStream.rebalance().map(...) ) operation
on heavy load stream wouldn't slow the stream ? is the rebalancing action
occurs only when there is a partition change ?
it says that "the rebelance call is causing a repartitioning of the data so
that all machines" is it actually changing the num of partitions of the
topic to match the num of flink operators ?

Avi


Re: where can I see logs from code

2018-11-25 Thread Avi Levi
Hi Miki,
Thanks for your reply. However I do not see the log written from the code
(I do use logback)

On Sun, Nov 25, 2018 at 12:30 PM miki haiat  wrote:

>
> You can see the logs in the webUI.
> If you click on the Task manager tab you can find the logs
>
> http://SERVERADD/#/taskmanager/TM_ID/log
>
>
>
>
>
> On Sun, Nov 25, 2018 at 12:11 PM Avi Levi  wrote:
>
>> Hi,
>> Where can I see the logs written by the app code (i.e by the app
>> developer) ?
>>
>> BR
>> Avi
>>
>


where can I see logs from code

2018-11-25 Thread Avi Levi
Hi,
Where can I see the logs written by the app code (i.e by the app developer)
?

BR
Avi


Re: your advice please regarding state

2018-11-21 Thread Avi Levi
Thanks a lot!  got it :)

On Wed, Nov 21, 2018 at 11:40 PM Jamie Grier  wrote:

> Hi Avi,
>
> The typical approach would be as you've described in #1.  #2 is not
> necessary -- #1 is already doing basically exactly that.
>
> -Jamie
>
>
> On Wed, Nov 21, 2018 at 3:36 AM Avi Levi  wrote:
>
>> Hi ,
>> I am very new to flink so please be gentle :)
>>
>> *The challenge:*
>> I have a road sensor that should scan billons of cars per day. for
>> starter I want to recognise if each car that passes by is new or not. new
>> cars (never been seen before by that sensor ) will be placed on a different
>> topic on kafka than the other (total of two topics for new and old) .
>>  under the assumption that the state will contain billions of unique car
>> ids.
>>
>> *Suggested Solutions*
>> My question is it which approach is better.
>> Both approaches using RocksDB
>>
>> 1. use the ValueState and to split the steam like
>>   *val domainsSrc = env*
>> *.addSource(consumer)*
>> *.keyBy(car => car.id <http://car.id>)*
>> *.map(...)*
>> and checking if the state value is null to recognise new cars. if new
>> than I will update the state
>> how will the persistent data will be shard among the nodes in the cluster
>> (let's say that I have 10 nodes) ?
>>
>> 2. use MapState and to partition the stream to groups by some arbitrary
>> factor e.g
>> *val domainsSrc = env*
>> *.addSource(consumer)*
>> *.keyBy{ car =>*
>> *val h car.id.hashCode % partitionFactor*
>> *math.abs(h)*
>> *} .map(...)*
>> and to check *mapState.keys.contains(car.id <http://car.id>) *if not -
>> add it to the state
>>
>> which approach is better ?
>>
>> Thanks in advance
>> Avi
>>
>


your advice please regarding state

2018-11-21 Thread Avi Levi
Hi ,
I am very new to flink so please be gentle :)

*The challenge:*
I have a road sensor that should scan billons of cars per day. for starter
I want to recognise if each car that passes by is new or not. new cars
(never been seen before by that sensor ) will be placed on a different
topic on kafka than the other (total of two topics for new and old) .
 under the assumption that the state will contain billions of unique car
ids.

*Suggested Solutions*
My question is it which approach is better.
Both approaches using RocksDB

1. use the ValueState and to split the steam like
  *val domainsSrc = env*
*.addSource(consumer)*
*.keyBy(car => car.id )*
*.map(...)*
and checking if the state value is null to recognise new cars. if new than
I will update the state
how will the persistent data will be shard among the nodes in the cluster
(let's say that I have 10 nodes) ?

2. use MapState and to partition the stream to groups by some arbitrary
factor e.g
*val domainsSrc = env*
*.addSource(consumer)*
*.keyBy{ car =>*
*val h car.id.hashCode % partitionFactor*
*math.abs(h)*
*} .map(...)*
and to check *mapState.keys.contains(car.id ) *if not - add
it to the state

which approach is better ?

Thanks in advance
Avi


Re: Could not extract key Exception only on runtime not in dev environment

2018-11-20 Thread Avi Levi
yes you can also see it in the message (and also it should have crushed
also on the ide) further more to be sure I added a filter that looks like
*env*
*.addSource(kafka_source)*
*.filter(_.id != null)*
*.keyBy{ r =>*
*val h = fastHash(r.id <http://r.id>) % partitionFactor*
*math.abs(h)*
*}*
*.map(...)*

and still the same

On Tue, Nov 20, 2018 at 5:31 PM miki haiat  wrote:

> What r.id  Value ?
> Are you sure that is not null ?
>
> Miki.
>
>
> On Tue, 20 Nov 2018, 17:26 Avi Levi 
>> I am running flink locally on my machine , I am getting the exception
>> below when reading from kafka topic. when running from the ide (intellij)
>> it is running perfectly. however when I deploy my jar to flink runtime
>> (locally) using
>>
>> */bin/flink run ~MyApp-1.0-SNAPSHOT.jar*
>> my class looks like this
>> case class Foo(id: String, value: String, timestamp: Long, counter: Int)
>> I am getting this exception
>>
>> *java.lang.RuntimeException: Could not extract key from 
>> Foo("some-uuid","text",1540348398,1)
>>  at 
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>>  at 
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>>  at 
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>>  at 
>> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>>  at 
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>>  at 
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>>  at 
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>>  at 
>> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
>>  at 
>> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
>>  at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738)
>>  at 
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>>  at 
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>  at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.RuntimeException: Could not extract key from 
>> Foo("some-uuid","text",1540348398,1)
>>  at 
>> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61)
>>  at 
>> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
>>  at 
>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:106)
>>  at 
>> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
>>  at 
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>>  ... 22 more
>> Caused by: java.lang.NullPointerException
>>  at com.bluevoya

Could not extract key Exception only on runtime not in dev environment

2018-11-20 Thread Avi Levi
I am running flink locally on my machine , I am getting the exception below
when reading from kafka topic. when running from the ide (intellij) it is
running perfectly. however when I deploy my jar to flink runtime (locally)
using

*/bin/flink run ~MyApp-1.0-SNAPSHOT.jar*
my class looks like this
case class Foo(id: String, value: String, timestamp: Long, counter: Int)
I am getting this exception

*java.lang.RuntimeException: Could not extract key from
Foo("some-uuid","text",1540348398,1)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Could not extract key from
Foo("some-uuid","text",1540348398,1)
at 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61)
at 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:106)
at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
... 22 more
Caused by: java.lang.NullPointerException
at com.bluevoyant.StreamingJob$$anonfun$3.apply(StreamingJob.scala:41)
at com.bluevoyant.StreamingJob$$anonfun$3.apply(StreamingJob.scala:40)
at 
org.apache.flink.streaming.api.scala.DataStream$$anon$2.getKey(DataStream.scala:411)
at 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:59)
... 26 more*


my key partition is simple (partitionFactor = some number)

*.keyBy{ r =>
val h = fastHash(r.id ) % partitionFactor
math.abs(h)
}*

again, this happens only on runtime not when I run it from intellij

this so frustrating, any advice ?


ClassNotFoundException: org.apache.kafka.common.metrics.stats.Rate$1

2018-11-20 Thread Avi Levi
looking at the log file of the taskexecutor I see this exception








*at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused
by: java.lang.ClassNotFoundException:
org.apache.kafka.common.metrics.stats.Rate$1at
java.net.URLClassLoader.findClass(URLClassLoader.java:382)at
java.lang.ClassLoader.loadClass(ClassLoader.java:424)at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)at
java.lang.ClassLoader.loadClass(ClassLoader.java:357)*
... 22 more
anyone know what should I do to avoid it ?