Flink SQL

2018-11-29 Thread Steve Bistline
Hi,

I have a silly question about Flink SQL that I cannot seem to find a clear
answer to. If I have the following code. Will the "result" from the sql
SELECT statement only return and the data then be written to S3 if and only
if the statement returns data that matches the criteria?

Does "nothing" happen otherwise ( ie no match to the sql statement.)?

tableEnv.registerDataStream("SENSORS",dataset,"t_deviceID, t_timeStamp,
t_sKey, t_sValue");


// TEMEPERATURE
Table result = tableEnv.sql("SELECT 'AlertTEMEPERATURE ',t_sKey,
t_deviceID, t_sValue FROM SENSORS WHERE t_sKey='TEMPERATURE' AND t_sValue >
" + TEMPERATURE_THRESHOLD);
tableEnv.toAppendStream(result, Row.class).print();
// Write to S3 bucket
DataStream dsRow = tableEnv.toAppendStream(result, Row.class);
String fileNameTemp = sdf.format(new Date());
dsRow.writeAsText("s3://csv-ai/flink-alerts/"+fileNameTemp+"TEMPERATURE.txt"
);


Flink window operation based on event time is triggered when watermark is less than the end of window ends

2018-11-29 Thread X L
Please refer to the stackoverflow
.
Thanks.

-- 
Thanks.

·
Lx
wlxwol...@gmail.com


Re: how to override s3 key config in flink job

2018-11-29 Thread Tony Wei
Hi Andrey,

Thanks for your detailed answer, and I have created a JIRA issue to discuss
it [1].
Please check the description and help me to fill the details, like
component/s, since
I'm not sure where it should be put. Thank you very much.

Best,
Tony Wei

[1] https://issues.apache.org/jira/browse/FLINK-11034


Andrey Zagrebin  於 2018年11月27日 週二 下午10:43寫道:

> Hi Tony,
>
> File system factories are class-loaded in running JVMs of task executors.
> That is why their configured objects are shared by different Flink jobs.
> It is not possible to change their options per created file system and per
> job at the moment.
>
> This could be changed, e.g. for s3, by providing "rewriting config” to
> file system factory “get" method,
> but this method is not usually called by users directly in user facing
> components, like checkpointing or file sink. The user API is now mainly the
> file system URI string without any specific config.
>
> I see that making it possible has value but it would require some
> involving changes in file system dependent APIs or changing the way how
> file systems are created in general.
> You could create a JIRA issue to discuss it.
>
> Best,
> Andrey
>
> > On 27 Nov 2018, at 10:06, yinhua.dai  wrote:
> >
> > It might be difficult as you the task manager and job manager are
> pre-started
> > in a session mode.
> >
> > It seems that flink http server will always use the configuration that
> you
> > specified when you start your flink cluster, i.e. start-cluster.sh, I
> don't
> > find a way to override it.
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>


long lived standalone job session cluster in kubernetes

2018-11-29 Thread Derek VerLee

  
  
I'm looking at the job cluster mode, it looks great and I and
  considering migrating our jobs off our "legacy" session cluster
  and into Kubernetes.

I do need to ask some questions because I haven't found a lot of
  details in the documentation about how it works yet, and I gave up
  following the the DI around in the code after a while.

Let's say I have a deployment for the job "leader" in HA with ZK,
  and another deployment for the taskmanagers.
I want to upgrade the code or configuration and start from a
  savepoint, in an automated way.

Best I can figure, I can not just update the deployment resources
  in kubernetes and allow the containers to restart in an arbitrary
  order.
Instead, I expect sequencing is important, something along the
  lines of this:
1. issue savepoint command on leader
  2. wait for savepoint
  3. destroy all leader and taskmanager containers
  4. deploy new leader, with savepoint url
  5. deploy new taskmanagers



 For example, I imagine old taskmanagers (with an old version of
  my job) attaching to the new leader and causing a problem.
Does that sound right, or am I overthinking it? 

If not, has anyone tried implementing any automation for this
  yet?
  



Re: Stream in loop and not getting to sink (Parquet writer )

2018-11-29 Thread Avi Levi
Thanks a lot Kostas, but the file not created . what am I doing wrong?
BTW how can you set the encoding etc' in Flink's Avro - Parquet writer?

object Tester extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  def now = System.currentTimeMillis()
  val path = new Path(s"test-$now.parquet")
  val schema: Schema = new Schema.Parser().parse(schemaString)
  val streamingSink = StreamingFileSink.forBulkFormat( path,
  ParquetAvroWriters.forGenericRecord(schema))
  .build()
  env.enableCheckpointing(100)
  val stream = env.addSource(FirstSeenQueueImpl.consumer).map{ r =>
val genericReocrd: GenericRecord = new GenericData.Record(schema)
genericReocrd.put("name", r.name)
genericReocrd.put("code", r.code.asString)
genericReocrd.put("ts", r.ts)
genericReocrd
  }
stream.addSink { r =>
println(s"In Sink $r") //getting this line
streamingSink
}
  env.execute()
}

Cheers
Avi

On Thu, Nov 29, 2018 at 1:36 PM Kostas Kloudas 
wrote:

>
> Sorry, previously I got confused and I assumed you were using Flink's
> StreamingFileSink.
>
> Could you try to use Flink's Avro - Parquet writer?
>
> StreamingFileSink.forBulkFormat(
>   Path...(MY_PATH),
>   ParquetAvroWriters.forGenericRecord(MY_SCHEMA))
> .build()
>
>
> Cheers,
> Kostas
>
> On Thu, Nov 29, 2018 at 12:25 PM Avi Levi  wrote:
>
>> Thanks.
>> yes, the *env.execute* is called and enabled checkpoints
>> I think the problem is where to place the *writer.close *to flush the
>> cache
>> If I'll place on the sink after the write event e.g
>> addSink{
>> writer.write
>> writer.close
>> }
>> in this case only the first record will be included in the file but not
>> the rest of the stream.
>>
>>
>> On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas <
>> k.klou...@data-artisans.com> wrote:
>>
>>> Hi again Avi,
>>>
>>> In the first example that you posted (the one with the Kafka source), do
>>> you call env.execute()?
>>>
>>> Cheers,
>>> Kostas
>>>
>>> On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas <
>>> k.klou...@data-artisans.com> wrote:
>>>
 Hi Avi,

 In the last snippet that you posted, you have not activated checkpoints.

 Checkpoints are needed for the StreamingFileSink to produce results,
 especially in the case of BulkWriters (like Parquet) where
 the part file is rolled upon reception of a checkpoint and the part is
 finalised (i.e. "committed") when the checkpoint gets completed
 successfully.

 Could you please enable checkpointing and make sure that the job runs
 long enough for at least some checkpoints to be completed?

 Thanks a lot,
 Kostas

 On Thu, Nov 29, 2018 at 7:03 AM Avi Levi 
 wrote:

> Checkout this little App. you can see that the file is created but no
> data is written. even for a single record
>
> import io.eels.component.parquet.ParquetWriterConfig
> import org.apache.avro.Schema
> import org.apache.avro.generic.{ GenericData, GenericRecord }
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.hadoop.fs.Path
> import org.apache.parquet.avro.AvroParquetWriter
> import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter }
> import org.apache.parquet.hadoop.metadata.CompressionCodecName
> import scala.io.Source
> import org.apache.flink.streaming.api.scala._
>
> object Tester extends App {
>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>   def now = System.currentTimeMillis()
>   val path = new Path(s"test-$now.parquet")
>   val schemaString = 
> Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
>   val schema: Schema = new Schema.Parser().parse(schemaString)
>   val compressionCodecName = CompressionCodecName.SNAPPY
>   val config = ParquetWriterConfig()
>   val genericReocrd: GenericRecord = new GenericData.Record(schema)
>   genericReocrd.put("name", "test_b")
>   genericReocrd.put("code", "NoError")
>   genericReocrd.put("ts", 100L)
>   val stream = env.fromElements(genericReocrd)
>   val writer: ParquetWriter[GenericRecord] = 
> AvroParquetWriter.builder[GenericRecord](path)
> .withSchema(schema)
> .withCompressionCodec(compressionCodecName)
> .withPageSize(config.pageSize)
> .withRowGroupSize(config.blockSize)
> .withDictionaryEncoding(config.enableDictionary)
> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
> .withValidation(config.validating)
> .build()
>
>   writer.write(genericReocrd)
>   stream.addSink { r =>
> println(s"In Sink $r")
> writer.write(r)
>   }
>   env.execute()
>   //  writer.close()
> }
>
>
> On Thu, Nov 29, 2018 at 6:57 AM vipul singh 
> wrote:
>
>> Can you try closing the writer?
>>
>> AvroParquetWriter has an internal buffer. Try 

Re: SQL Query named operator exceeds 80 characters

2018-11-29 Thread shkob1
OK, thanks for the help



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Table exception

2018-11-29 Thread Timo Walther

Hi Michael,

this dependency issue should have been fixed recently. Which Flink 
version are you using?


Regards,
Timo


Am 29.11.18 um 16:01 schrieb TechnoMage:
I have a simple test for looking at Flink SQL and hit an exception 
reported as a bug.  I wonder though if it is a missing dependency.


Michael

 Error in test
org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at 
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:630)
at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at 
com.cogility.hcep.tests.experimental.FlinkSQLTest.lambda$1(FlinkSQLTest.java:171)

at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.api.common.InvalidProgramException: Table 
program cannot be compiled. This is a bug. Please file an issue.
at 
org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
at 
org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.compile(GroupAggProcessFunction.scala:39)
at 
org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:61)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at 
org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
... 1 more
Caused by: org.codehaus.commons.compiler.CompileException: Line 43, 
Column 10: Unknown variable or type 
"org.apache.commons.codec.binary.Base64"

at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6443)
at org.codehaus.janino.UnitCompiler.access$13000(UnitCompiler.java:212)
at 
org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:6055)
at 
org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:6052)

at org.codehaus.janino.Java$Package.accept(Java.java:4074)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6438)
at org.codehaus.janino.UnitCompiler.access$13600(UnitCompiler.java:212)
at 
org.codehaus.janino.UnitCompiler$18$2$1.visitAmbiguousName(UnitCompiler.java:6080)
at 
org.codehaus.janino.UnitCompiler$18$2$1.visitAmbiguousName(UnitCompiler.java:6077)

at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4050)
at 
org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6077)
at 
org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6073)

at org.codehaus.janino.Java$Lvalue.accept(Java.java:3974)
at org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6073)
at org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6052)
at org.codehaus.janino.Java$Rvalue.accept(Java.java:3942)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052)
at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8591)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6689)
at org.codehaus.janino.UnitCompiler.access$15200(UnitCompiler.java:212)
at 
org.codehaus.janino.UnitCompiler$18$2.visitMethodInvocation(UnitCompiler.java:6100)
at 
org.codehaus.janino.UnitCompiler$18$2.visitMethodInvocation(UnitCompiler.java:6073)

at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:4874)
at org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6073)
at org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6052)
at org.codehaus.janino.Java$Rvalue.accept(Java.java:3942)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052)
at 
org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:8802)

at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8688)
at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8590)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4708)
at org.codehaus.janino.UnitCompiler.access$8200(UnitCompiler.java:212)
at 
org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:4071)
at 
org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:4044)

at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:4874)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4044)
at 
org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5224)

at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4667)
at org.codehaus.janino.UnitCompiler.access$7700(UnitCompiler.java:212)

Table exception

2018-11-29 Thread TechnoMage
I have a simple test for looking at Flink SQL and hit an exception reported as 
a bug.  I wonder though if it is a missing dependency.

Michael

 Error in test
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at 
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:630)
at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at 
com.cogility.hcep.tests.experimental.FlinkSQLTest.lambda$1(FlinkSQLTest.java:171)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
cannot be compiled. This is a bug. Please file an issue.
at 
org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
at 
org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.compile(GroupAggProcessFunction.scala:39)
at 
org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:61)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at 
org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
... 1 more
Caused by: org.codehaus.commons.compiler.CompileException: Line 43, Column 10: 
Unknown variable or type "org.apache.commons.codec.binary.Base64"
at 
org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6443)
at org.codehaus.janino.UnitCompiler.access$13000(UnitCompiler.java:212)
at 
org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:6055)
at 
org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:6052)
at org.codehaus.janino.Java$Package.accept(Java.java:4074)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6438)
at org.codehaus.janino.UnitCompiler.access$13600(UnitCompiler.java:212)
at 
org.codehaus.janino.UnitCompiler$18$2$1.visitAmbiguousName(UnitCompiler.java:6080)
at 
org.codehaus.janino.UnitCompiler$18$2$1.visitAmbiguousName(UnitCompiler.java:6077)
at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4050)
at 
org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6077)
at 
org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6073)
at org.codehaus.janino.Java$Lvalue.accept(Java.java:3974)
at 
org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6073)
at 
org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6052)
at org.codehaus.janino.Java$Rvalue.accept(Java.java:3942)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052)
at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8591)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6689)
at org.codehaus.janino.UnitCompiler.access$15200(UnitCompiler.java:212)
at 
org.codehaus.janino.UnitCompiler$18$2.visitMethodInvocation(UnitCompiler.java:6100)
at 
org.codehaus.janino.UnitCompiler$18$2.visitMethodInvocation(UnitCompiler.java:6073)
at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:4874)
at 
org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6073)
at 
org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6052)
at org.codehaus.janino.Java$Rvalue.accept(Java.java:3942)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052)
at 
org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:8802)
at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8688)
at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8590)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4708)
at org.codehaus.janino.UnitCompiler.access$8200(UnitCompiler.java:212)
at 
org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:4071)
at 
org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:4044)
at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:4874)
at 

Re: number of files in checkpoint directory grows endlessly

2018-11-29 Thread Andrey Zagrebin
Could you share the logs to check possible failures to subsume or remove 
previous checkpoints?
What is the sizes of the files? It can help to understand how compaction goes.
Could you also provide more details how you setup TtlDb with Flink?

Best,
Andrey

> On 29 Nov 2018, at 11:34, Andrey Zagrebin  wrote:
> 
> Compaction merges SST files in background using native threads. While merging 
> it filters out removed and expired data. In general, the idea is that there 
> are enough resources for compaction to keep up with the DB update rate and 
> reduce storage. It can be quite IO intensive. Compaction has a lot of tuning 
> knobs and statistics to monitor the process [1] which are usually out of the 
> scope of Flink depending on state access pattern of the application. You can 
> create and set RocksDBStateBackend for you application in Flink and configure 
> it with custom RocksDb/column specific options.
> 
> [1] https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide 
> 
> [2] https://github.com/facebook/rocksdb/wiki/Compaction 
> 
> 
>> On 29 Nov 2018, at 11:20, > > > > wrote:
>> 
>> We use TtlDB because the state contents should expire automatically after 24 
>> hours. Therefore we only changed the state backend to use TtlDb instead of 
>> RocksDB with a fixed retention time.
>> 
>> We have a slow IO because we only have SAN volumes available. Can you 
>> further clarify the problem with slow compaction.
>> 
>> Regards,
>> 
>> Bernd
>> 
>> 
>> -Ursprüngliche Nachricht-
>> Von: Andrey Zagrebin [mailto:and...@data-artisans.com 
>> ]
>> Gesendet: Donnerstag, 29. November 2018 11:01
>> An: Winterstein, Bernd
>> Cc: Kostas Kloudas; user; s.rich...@data-artisans.com 
>> ; t...@data-artisans.com 
>> ; step...@data-artisans.com 
>> 
>> Betreff: Re: number of files in checkpoint directory grows endlessly
>> 
>> If you use incremental checkpoints, state backend stores raw RocksDB SST 
>> files which represent all state data. Each checkpoint adds SST files with 
>> new updates which are not present in previous checkpoint, basically their 
>> difference.
>> 
>> One of the following could be happening:
>> - old keys are not explicitly deleted or expire (depending on how TtlDb is 
>> used)
>> - compaction is too slow to drop older SST files for the latest checkpoint 
>> so that they can be deleted with the previous checkpoints
>> 
>>> On 29 Nov 2018, at 10:48, >> > >> > wrote:
>>> 
>>> Hi
>>> We use Flink 1..6.2. As for the checkpoint directory there is only one 
>>> chk-xxx directory. Therefore if would expect only one checkpoint remains.
>>> The value of 'state.checkpoints.num-retained’ is not set explicitly.
>>> 
>>> The problem is not the number of checkpoints but the number of files in the 
>>> "shared" directory next to the chk-xxx directory.
>>> 
>>> 
>>> -Ursprüngliche Nachricht-
>>> Von: Andrey Zagrebin [mailto:and...@data-artisans.com 
>>> ]
>>> Gesendet: Donnerstag, 29. November 2018 10:39
>>> An: Kostas Kloudas
>>> Cc: Winterstein, Bernd; user; Stefan Richter; Till Rohrmann; Stephan
>>> Ewen
>>> Betreff: Re: number of files in checkpoint directory grows endlessly
>>> 
>>> Hi Bernd,
>>> 
>>> Did you change 'state.checkpoints.num-retained’ in flink-conf.yaml? By 
>>> default, only one checkpoint should be retained.
>>> 
>>> Which version of Flink do you use?
>>> Can you check Job Master logs whether you see there warning like this:
>>> `Fail to subsume the old checkpoint`?
>>> 
>>> Best,
>>> Andrey
>>> 
 On 29 Nov 2018, at 10:18, Kostas Kloudas >>> > wrote:
 
 Hi Bernd,
 
 I think the Till, Stefan or Stephan (cc'ed) are the best to answer your 
 question.
 
 Cheers,
 Kostas
>>> 
>>> 
>>> 
>>> 
>>> Landesbank Hessen-Thueringen Girozentrale Anstalt des oeffentlichen
>>> Rechts
>>> Sitz: Frankfurt am Main / Erfurt
>>> Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA
>>> 102181
>>> 
>>> Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum 
>>> Informationsaustausch. Wir koennen auf diesem Wege keine 
>>> rechtsgeschaeftlichen Erklaerungen (Auftraege etc.) entgegennehmen.
>>> 
>>> Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen 
>>> Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch 
>>> Dritte ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, 
>>> so bitten wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung 
>>> zu setzen.
>>> 
>>> Please use 

Re: Dataset column statistics

2018-11-29 Thread Fabian Hueske
I'd try to tune it in a single query.
If that does not work, go for as few queries as possible, splitting by
column for better projection push-down.

This is the first time I hear somebody requesting ANALYZE TABLE.
I don't see a reason why it shouldn't be added in the future.



Am Do., 29. Nov. 2018 um 12:08 Uhr schrieb Flavio Pompermaier <
pomperma...@okkam.it>:

> What do you advice to compute column stats?
> Should I run multiple job (one per column) or try to compute all at once?
>
> Are you ever going to consider supporting ANALYZE TABLE (like in Hive or
> Spark) in Flink Table API?
>
> Best,
> Flavio
>
> On Thu, Nov 29, 2018 at 9:45 AM Fabian Hueske  wrote:
>
>> Hi,
>>
>> You could try to enable object reuse.
>> Alternatively you can give more heap memory or fine tune the GC
>> parameters.
>>
>> I would not consider it a bug in Flink, but might be something that could
>> be improved.
>>
>> Fabian
>>
>>
>> Am Mi., 28. Nov. 2018 um 18:19 Uhr schrieb Flavio Pompermaier <
>> pomperma...@okkam.it>:
>>
>>> Hi to all,
>>> I have a batch dataset  and I want to get some standard info about its
>>> columns (like min, max, avg etc).
>>> In order to achieve this I wrote a simple program that use SQL on table
>>> API like the following:
>>>
>>> SELECT
>>> MAX(col1), MIN(col1), AVG(col1),
>>> MAX(col2), MIN(col2), AVG(col2),
>>> MAX(col3), MIN(col3), AVG(col3)
>>> FROM MYTABLE
>>>
>>> In my dataset I have about 50 fields and the query becomes quite big
>>> (and the job plan too).
>>> It seems that this kind of job cause the cluster to crash (too much
>>> garbage collection).
>>> Is there any smarter way to achieve this goal (apart from running a job
>>> per column)?
>>> Is this "normal" or is this a bug of Flink?
>>>
>>> Best,
>>> Flavio
>>>
>>
>


Will Zookeeper HA work when the cluster is run in standalonejob mode?

2018-11-29 Thread Sergei Poganshev
If I run the clustiner in "standalonejob" mode (by providing the job
arguments to the job manager upon starting it) and configure HA using
Zookeeper will the job restore correctly after the job manager restarts
with the same "standalonejob" arguments?

Will restart the job (due to job arguments passed) or will HA state have a
higher priority?


Re: Dulicated messages in kafka sink topic using flink cancel-with-savepoint operation

2018-11-29 Thread Piotr Nowojski
Hi Nastaran,

When you are checking for duplicated messages, are you reading from kafka using 
`read_commited` mode (this is not the default value)?

https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-producer-partitioning-scheme

> Semantic.EXACTLY_ONCE: uses Kafka transactions to provide exactly-once 
> semantic. Whenever you write to Kafka using
> transactions, do not forget about setting desired isolation.level 
> (read_committed or read_uncommitted - the latter one is the
> default value) for any application consuming records from Kafka.

Does the problem happens always?

Piotrek

> On 28 Nov 2018, at 08:56, Nastaran Motavali  wrote:
> 
> Hi,
> I have a flink streaming job implemented via java which reads some messages 
> from a kafka topic, transforms them and finally sends them to another kafka 
> topic.
> The version of flink is 1.6.2 and the kafka version is 011. I pass the 
> Semantic.EXACTLY_ONCE parameter to the producer. The problem is that when I 
> cancel the job with savepoint and then restart it using the saved savepoint, 
> I have duplicated messages in the sink.
> Do I miss some kafka/flink configurations to avoid duplication?
> 
> 
> Kind regards,
> Nastaran Motavalli



Re: Check-pointing error

2018-11-29 Thread Chesnay Schepler
Would it be possible for you to try this with 1.6-SNAPSHOT? This issue 
may have been fixed with https://issues.apache.org/jira/browse/FLINK-10839.


On 29.11.2018 12:11, Felipe Quirce wrote:

Hi

I'm using the flink 1.6.2, and full stack trace is

java.io.IOException: Exception while applying AggregateFunction in 
aggregating state
 4308 at 
org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107)
 4309 at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:391)
 4310 at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
 4311 at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
 4312 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
 4313 at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

 4314 at java.lang.Thread.run(Thread.java:748)
 4315 Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
 4316 at 
com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)

 4317 at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
 4318 at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:129)
 4319 at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)

 4320 at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
 4321 at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217)
 4322 at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239)
 4323 at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
 4324 at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)

 4325 at scala.collection.immutable.List.foreach(List.scala:392)
 4326 at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
 4327 at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
 4328 at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
 4329 at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
 4330 at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
 4331 at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
 4332 at 
scala.collection.Iterator$class.foreach(Iterator.scala:891)
 4333 at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
 4334 at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 4335 at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 4336 at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
 4337 at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
 4338 at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:465)
 4339 at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341)
 4340 at 
org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105)

 4341 ... 6 more

Thanks

On Thu, 29 Nov 2018 at 11:55, Felipe Quirce > wrote:


Hi,

I have found a problem during the checkpoint.
Could anyone help me or help me to debug it?
Exception:

1804 2018-11-29 11:31:00,448 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph -
keyedstats-processor-165 -> map2alert-165 -> Process -> Sink:
sink-level165 (1/2) (d860069560a4e3e6a62a450c9e3fa699)
switched from RUNNING to FAILED.
51805 java.io.IOException: Exception while applying
AggregateFunction in aggregating state
51806 at

org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107)
51807 at

org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:391)
51808 at

org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
51809 at

org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
51810 at

org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)

Re: Stream in loop and not getting to sink (Parquet writer )

2018-11-29 Thread Kostas Kloudas
Sorry, previously I got confused and I assumed you were using Flink's
StreamingFileSink.

Could you try to use Flink's Avro - Parquet writer?

StreamingFileSink.forBulkFormat(
  Path...(MY_PATH),
  ParquetAvroWriters.forGenericRecord(MY_SCHEMA))
.build()


Cheers,
Kostas

On Thu, Nov 29, 2018 at 12:25 PM Avi Levi  wrote:

> Thanks.
> yes, the *env.execute* is called and enabled checkpoints
> I think the problem is where to place the *writer.close *to flush the
> cache
> If I'll place on the sink after the write event e.g
> addSink{
> writer.write
> writer.close
> }
> in this case only the first record will be included in the file but not
> the rest of the stream.
>
>
> On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas <
> k.klou...@data-artisans.com> wrote:
>
>> Hi again Avi,
>>
>> In the first example that you posted (the one with the Kafka source), do
>> you call env.execute()?
>>
>> Cheers,
>> Kostas
>>
>> On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas <
>> k.klou...@data-artisans.com> wrote:
>>
>>> Hi Avi,
>>>
>>> In the last snippet that you posted, you have not activated checkpoints.
>>>
>>> Checkpoints are needed for the StreamingFileSink to produce results,
>>> especially in the case of BulkWriters (like Parquet) where
>>> the part file is rolled upon reception of a checkpoint and the part is
>>> finalised (i.e. "committed") when the checkpoint gets completed
>>> successfully.
>>>
>>> Could you please enable checkpointing and make sure that the job runs
>>> long enough for at least some checkpoints to be completed?
>>>
>>> Thanks a lot,
>>> Kostas
>>>
>>> On Thu, Nov 29, 2018 at 7:03 AM Avi Levi 
>>> wrote:
>>>
 Checkout this little App. you can see that the file is created but no
 data is written. even for a single record

 import io.eels.component.parquet.ParquetWriterConfig
 import org.apache.avro.Schema
 import org.apache.avro.generic.{ GenericData, GenericRecord }
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.hadoop.fs.Path
 import org.apache.parquet.avro.AvroParquetWriter
 import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter }
 import org.apache.parquet.hadoop.metadata.CompressionCodecName
 import scala.io.Source
 import org.apache.flink.streaming.api.scala._

 object Tester extends App {
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   def now = System.currentTimeMillis()
   val path = new Path(s"test-$now.parquet")
   val schemaString = 
 Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
   val schema: Schema = new Schema.Parser().parse(schemaString)
   val compressionCodecName = CompressionCodecName.SNAPPY
   val config = ParquetWriterConfig()
   val genericReocrd: GenericRecord = new GenericData.Record(schema)
   genericReocrd.put("name", "test_b")
   genericReocrd.put("code", "NoError")
   genericReocrd.put("ts", 100L)
   val stream = env.fromElements(genericReocrd)
   val writer: ParquetWriter[GenericRecord] = 
 AvroParquetWriter.builder[GenericRecord](path)
 .withSchema(schema)
 .withCompressionCodec(compressionCodecName)
 .withPageSize(config.pageSize)
 .withRowGroupSize(config.blockSize)
 .withDictionaryEncoding(config.enableDictionary)
 .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
 .withValidation(config.validating)
 .build()

   writer.write(genericReocrd)
   stream.addSink { r =>
 println(s"In Sink $r")
 writer.write(r)
   }
   env.execute()
   //  writer.close()
 }


 On Thu, Nov 29, 2018 at 6:57 AM vipul singh  wrote:

> Can you try closing the writer?
>
> AvroParquetWriter has an internal buffer. Try doing a .close() in
> snapshot()( since you are checkpointing hence this method will be called)
>
> On Wed, Nov 28, 2018 at 7:33 PM Avi Levi 
> wrote:
>
>> Thanks Rafi,
>> I am actually not using assignTimestampsAndWatermarks , I will try
>> to add it as you suggested. however it seems that the messages I 
>> repeating
>> in the stream over and over even if I am pushing single message manually 
>> to
>> the queue, that message will repeat infinity
>>
>> Cheers
>> Avi
>>
>>
>> On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch 
>> wrote:
>>
>>> Hi Avi,
>>>
>>> I can't see the part where you use  assignTimestampsAndWatermarks.
>>> If this part in not set properly, it's possible that watermarks are
>>> not sent and nothing will be written to your Sink.
>>>
>>> See here for more details:
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
>>>
>>> Hope this helps,
>>> Rafi
>>>
>>> On Wed, Nov 28, 2018, 21:22 Avi 

Re: Stream in loop and not getting to sink (Parquet writer )

2018-11-29 Thread Avi Levi
Thanks.
yes, the *env.execute* is called and enabled checkpoints
I think the problem is where to place the *writer.close *to flush the cache
If I'll place on the sink after the write event e.g
addSink{
writer.write
writer.close
}
in this case only the first record will be included in the file but not the
rest of the stream.


On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas 
wrote:

> Hi again Avi,
>
> In the first example that you posted (the one with the Kafka source), do
> you call env.execute()?
>
> Cheers,
> Kostas
>
> On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas <
> k.klou...@data-artisans.com> wrote:
>
>> Hi Avi,
>>
>> In the last snippet that you posted, you have not activated checkpoints.
>>
>> Checkpoints are needed for the StreamingFileSink to produce results,
>> especially in the case of BulkWriters (like Parquet) where
>> the part file is rolled upon reception of a checkpoint and the part is
>> finalised (i.e. "committed") when the checkpoint gets completed
>> successfully.
>>
>> Could you please enable checkpointing and make sure that the job runs
>> long enough for at least some checkpoints to be completed?
>>
>> Thanks a lot,
>> Kostas
>>
>> On Thu, Nov 29, 2018 at 7:03 AM Avi Levi  wrote:
>>
>>> Checkout this little App. you can see that the file is created but no
>>> data is written. even for a single record
>>>
>>> import io.eels.component.parquet.ParquetWriterConfig
>>> import org.apache.avro.Schema
>>> import org.apache.avro.generic.{ GenericData, GenericRecord }
>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>> import org.apache.hadoop.fs.Path
>>> import org.apache.parquet.avro.AvroParquetWriter
>>> import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter }
>>> import org.apache.parquet.hadoop.metadata.CompressionCodecName
>>> import scala.io.Source
>>> import org.apache.flink.streaming.api.scala._
>>>
>>> object Tester extends App {
>>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>   def now = System.currentTimeMillis()
>>>   val path = new Path(s"test-$now.parquet")
>>>   val schemaString = 
>>> Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
>>>   val schema: Schema = new Schema.Parser().parse(schemaString)
>>>   val compressionCodecName = CompressionCodecName.SNAPPY
>>>   val config = ParquetWriterConfig()
>>>   val genericReocrd: GenericRecord = new GenericData.Record(schema)
>>>   genericReocrd.put("name", "test_b")
>>>   genericReocrd.put("code", "NoError")
>>>   genericReocrd.put("ts", 100L)
>>>   val stream = env.fromElements(genericReocrd)
>>>   val writer: ParquetWriter[GenericRecord] = 
>>> AvroParquetWriter.builder[GenericRecord](path)
>>> .withSchema(schema)
>>> .withCompressionCodec(compressionCodecName)
>>> .withPageSize(config.pageSize)
>>> .withRowGroupSize(config.blockSize)
>>> .withDictionaryEncoding(config.enableDictionary)
>>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>> .withValidation(config.validating)
>>> .build()
>>>
>>>   writer.write(genericReocrd)
>>>   stream.addSink { r =>
>>> println(s"In Sink $r")
>>> writer.write(r)
>>>   }
>>>   env.execute()
>>>   //  writer.close()
>>> }
>>>
>>>
>>> On Thu, Nov 29, 2018 at 6:57 AM vipul singh  wrote:
>>>
 Can you try closing the writer?

 AvroParquetWriter has an internal buffer. Try doing a .close() in
 snapshot()( since you are checkpointing hence this method will be called)

 On Wed, Nov 28, 2018 at 7:33 PM Avi Levi 
 wrote:

> Thanks Rafi,
> I am actually not using assignTimestampsAndWatermarks , I will try to
> add it as you suggested. however it seems that the messages I repeating in
> the stream over and over even if I am pushing single message manually to
> the queue, that message will repeat infinity
>
> Cheers
> Avi
>
>
> On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch 
> wrote:
>
>> Hi Avi,
>>
>> I can't see the part where you use  assignTimestampsAndWatermarks.
>> If this part in not set properly, it's possible that watermarks are
>> not sent and nothing will be written to your Sink.
>>
>> See here for more details:
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
>>
>> Hope this helps,
>> Rafi
>>
>> On Wed, Nov 28, 2018, 21:22 Avi Levi >
>>> Hi,
>>>
>>> I am trying to implement Parquet Writer as SinkFunction. The
>>> pipeline consists of kafka as source and parquet file as a sink however 
>>> it
>>> seems like the stream is repeating itself like endless loop and the 
>>> parquet
>>> file is not written . can someone please help me with this?
>>>
>>> object ParquetSinkWriter{
>>>   private val path = new Path("tmp/pfile")
>>>   private val schemaString =
>>> 

Re: Questions about UDTF in flink SQL

2018-11-29 Thread Timo Walther

Hi Wangsan,

currently, UDFs have very strict result type assumptions. This is 
necessary to determine the serializers for the cluster. There were 
multiple requests for more flexible handling of types in UDFs.


Please have a look at:
- [FLINK-7358] Add implicitly converts support for User-defined function
- [FLINK-9294] [table] Improve type inference for UDFs with composite 
parameter and/or result type

- [FLINK-10958] [table] Add overload support for user defined function

I you think those issues do not represent what you need. You can open a 
new issue with a little example of what feature you think is missing.


Regards,
Timo


Am 28.11.18 um 09:59 schrieb wangsan:

Hi all,

When using user-defined table function in Flink SQL, it seems that the result 
type of a table function must be determinstic.

If I want a UDTF whose result type is determined by its input parameters, what 
should I do?

What I want to do is like this:

```
SELECT input, f1, f2 length FROM MyTable, LATERAL TABLE(unnest_udtf(input, v1, 
v2)) as T(f1, f2), LATERAL TABLE(unnest_udtf(input, v3, v4, v5)) as T(f3, f4, 
f5)
```

I can surely register the same UDTF with different name and configuration, but 
I guess that’s not a good idea :(.

If we can not make this in Flink SQL for now , may be we should consider this 
feature in future?

Best,
wangsan





Re: Check-pointing error

2018-11-29 Thread Felipe Quirce
Hi

I'm using the flink 1.6.2, and full stack trace is

java.io.IOException: Exception while applying AggregateFunction in
aggregating state
 4308 at
org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107)
 4309 at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:391)
 4310 at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
 4311 at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
 4312 at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
 4313 at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
 4314 at
java.lang.Thread.run(Thread.java:748)
 4315 Caused by: java.lang.ArrayIndexOutOfBoundsException:
-1
 4316 at
com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
 4317 at
com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
 4318 at
com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:129)
 4319 at
com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
 4320 at
com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
 4321 at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217)
 4322 at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239)
 4323 at
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
 4324 at
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
 4325 at
scala.collection.immutable.List.foreach(List.scala:392)
 4326 at
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
 4327 at
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
 4328 at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
 4329 at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
 4330 at
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
 4331 at
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
 4332 at
scala.collection.Iterator$class.foreach(Iterator.scala:891)
 4333 at
scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
 4334 at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 4335 at
scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 4336 at
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
 4337 at
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
 4338 at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:465)
 4339 at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341)
 4340 at
org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105)
 4341 ... 6 more

Thanks

On Thu, 29 Nov 2018 at 11:55, Felipe Quirce  wrote:

> Hi,
>
> I have found a problem during the checkpoint.
> Could anyone help me or help me to debug it?
> Exception:
>
>> 1804 2018-11-29 11:31:00,448 INFO  
>> org.apache.flink.runtime.executiongraph.ExecutionGraph
>> - keyedstats-processor-165 -> map2alert-165   -> Process -> Sink:
>> sink-level165 (1/2) (d860069560a4e3e6a62a450c9e3fa699) switched from
>> RUNNING to FAILED.
>> 51805 java.io.IOException: Exception while applying AggregateFunction in
>> aggregating state
>> 51806 at
>> org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107)
>> 51807 at
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:391)
>> 51808 at
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>> 51809 at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>> 51810 at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>> 51811 at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> 51812 at
>> java.lang.Thread.run(Thread.java:748)
>> 51813 Caused by: java.lang.ArrayIndexOutOfBoundsException
>>
>
> Thanks in Advance,
>


Re: Dataset column statistics

2018-11-29 Thread Flavio Pompermaier
What do you advice to compute column stats?
Should I run multiple job (one per column) or try to compute all at once?

Are you ever going to consider supporting ANALYZE TABLE (like in Hive or
Spark) in Flink Table API?

Best,
Flavio

On Thu, Nov 29, 2018 at 9:45 AM Fabian Hueske  wrote:

> Hi,
>
> You could try to enable object reuse.
> Alternatively you can give more heap memory or fine tune the GC parameters.
>
> I would not consider it a bug in Flink, but might be something that could
> be improved.
>
> Fabian
>
>
> Am Mi., 28. Nov. 2018 um 18:19 Uhr schrieb Flavio Pompermaier <
> pomperma...@okkam.it>:
>
>> Hi to all,
>> I have a batch dataset  and I want to get some standard info about its
>> columns (like min, max, avg etc).
>> In order to achieve this I wrote a simple program that use SQL on table
>> API like the following:
>>
>> SELECT
>> MAX(col1), MIN(col1), AVG(col1),
>> MAX(col2), MIN(col2), AVG(col2),
>> MAX(col3), MIN(col3), AVG(col3)
>> FROM MYTABLE
>>
>> In my dataset I have about 50 fields and the query becomes quite big (and
>> the job plan too).
>> It seems that this kind of job cause the cluster to crash (too much
>> garbage collection).
>> Is there any smarter way to achieve this goal (apart from running a job
>> per column)?
>> Is this "normal" or is this a bug of Flink?
>>
>> Best,
>> Flavio
>>
>


Re: Check-pointing error

2018-11-29 Thread Chesnay Schepler
Please provide the full exception stack trace and version of Flink that 
you are using.


On 29.11.2018 11:55, Felipe Quirce wrote:

Hi,

I have found a problem during the checkpoint.
Could anyone help me or help me to debug it?
Exception:

1804 2018-11-29 11:31:00,448 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph -
keyedstats-processor-165 -> map2alert-165   -> Process ->
Sink: sink-level165 (1/2) (d860069560a4e3e6a62a450c9e3fa699)
switched from RUNNING to FAILED.
51805 java.io.IOException: Exception while applying
AggregateFunction in aggregating state
51806 at

org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107)
51807 at

org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:391)
51808 at

org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
51809 at

org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
51810 at

org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
51811 at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
51812 at java.lang.Thread.run(Thread.java:748)
51813 Caused by: java.lang.ArrayIndexOutOfBoundsException


Thanks in Advance,





Re: Looking for relevant sources related to connecting Apache Flink and Edgent.

2018-11-29 Thread Felipe Gutierrez
thanks Kostas for the quick reply,

yes. It is related to my previous question.

When you said "But if you know what operation to push down" -> This is what
I am trying to search on Flink code. I want to know the operation on the
fly.
The component on Flink that will say to me that there is a filter on the
query specified by the user. I want to get this metadata and send a message
to my RPi through a Flink connector (I guess this is the way to do) and the
data stream will come to Flink already filtered.

I intend to start with a simple and naive example. Do you know which
component on Flink I can get the operations on the fly that are
running inside a query?

thanks
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


On Thu, Nov 29, 2018 at 11:18 AM Kostas Kloudas 
wrote:

> Hi again,
>
> I forgot to say that, unfortunately, I am not familiar with Apache Edgent,
> but if you can write your filter in Edgent's programming model,
> Then you can push your data from Edgent to a third party storage system
> (e.g. Kafka, HDFS, etc) and use Flink's connectors, instead of
> having to implement a custom source.
>
> Cheers,
> Kostas
>
> On Thu, Nov 29, 2018 at 11:08 AM Kostas Kloudas <
> k.klou...@data-artisans.com> wrote:
>
>> Hi Felipe,
>>
>> This seems related to your previous question about a custom scheduler
>> that knows which task to run on which machine.
>> As Chesnay said, this is a rather involved and laborious task, if you
>> want to do it as a general framework.
>>
>> But if you know what operation to push down, then why not decoupling the
>> two and implementing the filtering as a separate job
>> running on your Raspberry and a new job which consumes the output of the
>> first and does the analytics?
>>
>> Cheers,
>> Kostas
>>
>> On Thu, Nov 29, 2018 at 10:23 AM Felipe Gutierrez <
>> felipe.o.gutier...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am trying to design a little prototype with Flink and Apache Edgent (
>>> http://edgent.apache.org/) and I would like some help on the direction
>>> for it. I am running Flink at my laptop and Edgent on my Raspberry Pi with
>>> a simple filter for a proximity sensor (
>>> https://github.com/felipegutierrez/explore-rpi/blob/master/src/main/java/org/sense/edgent/app/UltrasonicEdgentApp.java
>>> ).
>>>
>>> My idea is to push down the filter operator from Flink to the Raspberry
>>> Pi which is running Apache Edgent. With this in mind, where do you guys
>>> advise me to start?
>>>
>>> I have some ideas to study...
>>> 1 - Try to get the list of operators that Flink is about to execute on
>>> the JobManager. source:
>>> https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html
>>> 2 - Implement a connector to Apache Edgent in order to exchange messages
>>> between Flink and Edgent.
>>>
>>> Do you guys think in another source that is interesting regarding my
>>> prototype?
>>>
>>> Thanks,
>>> Felipe
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez*
>>> *--* *https://felipeogutierrez.blogspot.com
>>> *
>>>
>>


Check-pointing error

2018-11-29 Thread Felipe Quirce
Hi,

I have found a problem during the checkpoint.
Could anyone help me or help me to debug it?
Exception:

> 1804 2018-11-29 11:31:00,448 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph
> - keyedstats-processor-165 -> map2alert-165   -> Process -> Sink:
> sink-level165 (1/2) (d860069560a4e3e6a62a450c9e3fa699) switched from
> RUNNING to FAILED.
> 51805 java.io.IOException: Exception while applying AggregateFunction in
> aggregating state
> 51806 at
> org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107)
> 51807 at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:391)
> 51808 at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> 51809 at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> 51810 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> 51811 at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> 51812 at
> java.lang.Thread.run(Thread.java:748)
> 51813 Caused by: java.lang.ArrayIndexOutOfBoundsException
>

Thanks in Advance,


Re: SQL Query named operator exceeds 80 characters

2018-11-29 Thread Chesnay Schepler
This is a safeguard in the metric system to prevent extremely long names 
(as these could cause the reporting to fail); so long as the prefix is 
unique you can safely ignore this warning.


On 29.11.2018 10:40, Timo Walther wrote:

Unfortunetely, renaming of operators is not supported so far.

We are currently thinking about a way of having fine-grained control 
about properties of SQL operators but this is in an early design phase 
and might take a while.


Regards,
Timo

Am 29.11.18 um 10:32 schrieb Kostas Kloudas:

Hi,

I think that you cannot set it explicitly.

The reason that I would say that is because SQL query gets parsed 
through Calcite
and then get translated to a DataStream program through a process 
that is rather

opaque to the user.

That said, I also cc'ed Fabian and Timo who know more on the topic.

Cheers,
Kostas








Re: Checkpointing to gcs taking too long

2018-11-29 Thread Chesnay Schepler
Please provide the full Exception stack trace and the configuration of 
your job (parallelism, number of stateful operators).
Have you tried using the gcs-connector in isolation? This may not be an 
issue with Flink.


On 28.11.2018 07:01, prakhar_mathur wrote:

I am trying to run flink on kubernetes, and trying to push checkpoints to
Google Cloud Storage. Below is the docker file

`FROM flink:1.6.2-hadoop28-scala_2.11-alpine

RUN wget -O lib/gcs-connector-latest-hadoop2.jar
https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar

RUN wget -O lib/gcs-connector-latest-hadoop2.jar
https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar
&& \
wget
http://ftp.fau.de/apache/flink/flink-1.6.2/flink-1.6.2-bin-hadoop28-scala_2.11.tgz
&& \
tar xf flink-1.6.2-bin-hadoop28-scala_2.11.tgz && \
mv flink-1.6.2/lib/flink-shaded-hadoop2* lib/  && \
rm -r flink-1.6.2*`

But the checkpoints are taking around 2-3 seconds on average and around 25
seconds at max, even the state size is around 100 KB.

Even the jobs are getting restarted with the error
`AsynchronousException{java.lang.Exception: Could not materialize checkpoint
1640 for operator groupBy` and sometimes losing connections with task
managers.

Currently, I have given the heap size of 4096 MB.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Looking for relevant sources related to connecting Apache Flink and Edgent.

2018-11-29 Thread Kostas Kloudas
Hi again,

I forgot to say that, unfortunately, I am not familiar with Apache Edgent,
but if you can write your filter in Edgent's programming model,
Then you can push your data from Edgent to a third party storage system
(e.g. Kafka, HDFS, etc) and use Flink's connectors, instead of
having to implement a custom source.

Cheers,
Kostas

On Thu, Nov 29, 2018 at 11:08 AM Kostas Kloudas 
wrote:

> Hi Felipe,
>
> This seems related to your previous question about a custom scheduler that
> knows which task to run on which machine.
> As Chesnay said, this is a rather involved and laborious task, if you want
> to do it as a general framework.
>
> But if you know what operation to push down, then why not decoupling the
> two and implementing the filtering as a separate job
> running on your Raspberry and a new job which consumes the output of the
> first and does the analytics?
>
> Cheers,
> Kostas
>
> On Thu, Nov 29, 2018 at 10:23 AM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> Hi,
>>
>> I am trying to design a little prototype with Flink and Apache Edgent (
>> http://edgent.apache.org/) and I would like some help on the direction
>> for it. I am running Flink at my laptop and Edgent on my Raspberry Pi with
>> a simple filter for a proximity sensor (
>> https://github.com/felipegutierrez/explore-rpi/blob/master/src/main/java/org/sense/edgent/app/UltrasonicEdgentApp.java
>> ).
>>
>> My idea is to push down the filter operator from Flink to the Raspberry
>> Pi which is running Apache Edgent. With this in mind, where do you guys
>> advise me to start?
>>
>> I have some ideas to study...
>> 1 - Try to get the list of operators that Flink is about to execute on
>> the JobManager. source:
>> https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html
>> 2 - Implement a connector to Apache Edgent in order to exchange messages
>> between Flink and Edgent.
>>
>> Do you guys think in another source that is interesting regarding my
>> prototype?
>>
>> Thanks,
>> Felipe
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> *
>>
>


Re: Looking for relevant sources related to connecting Apache Flink and Edgent.

2018-11-29 Thread Kostas Kloudas
Hi Felipe,

This seems related to your previous question about a custom scheduler that
knows which task to run on which machine.
As Chesnay said, this is a rather involved and laborious task, if you want
to do it as a general framework.

But if you know what operation to push down, then why not decoupling the
two and implementing the filtering as a separate job
running on your Raspberry and a new job which consumes the output of the
first and does the analytics?

Cheers,
Kostas

On Thu, Nov 29, 2018 at 10:23 AM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi,
>
> I am trying to design a little prototype with Flink and Apache Edgent (
> http://edgent.apache.org/) and I would like some help on the direction
> for it. I am running Flink at my laptop and Edgent on my Raspberry Pi with
> a simple filter for a proximity sensor (
> https://github.com/felipegutierrez/explore-rpi/blob/master/src/main/java/org/sense/edgent/app/UltrasonicEdgentApp.java
> ).
>
> My idea is to push down the filter operator from Flink to the Raspberry Pi
> which is running Apache Edgent. With this in mind, where do you guys advise
> me to start?
>
> I have some ideas to study...
> 1 - Try to get the list of operators that Flink is about to execute on the
> JobManager. source:
> https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html
> 2 - Implement a connector to Apache Edgent in order to exchange messages
> between Flink and Edgent.
>
> Do you guys think in another source that is interesting regarding my
> prototype?
>
> Thanks,
> Felipe
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>


Re: number of files in checkpoint directory grows endlessly

2018-11-29 Thread Andrey Zagrebin
If you use incremental checkpoints, state backend stores raw RocksDB SST files 
which represent all state data. Each checkpoint adds SST files with new updates 
which are not present in previous checkpoint, basically their difference.

One of the following could be happening:
- old keys are not explicitly deleted or expire (depending on how TtlDb is used)
- compaction is too slow to drop older SST files for the latest checkpoint so 
that they can be deleted with the previous checkpoints

> On 29 Nov 2018, at 10:48,  
>  wrote:
> 
> Hi
> We use Flink 1..6.2. As for the checkpoint directory there is only one 
> chk-xxx directory. Therefore if would expect only one checkpoint remains.
> The value of 'state.checkpoints.num-retained’ is not set explicitly.
> 
> The problem is not the number of checkpoints but the number of files in the 
> "shared" directory next to the chk-xxx directory.
> 
> 
> -Ursprüngliche Nachricht-
> Von: Andrey Zagrebin [mailto:and...@data-artisans.com]
> Gesendet: Donnerstag, 29. November 2018 10:39
> An: Kostas Kloudas
> Cc: Winterstein, Bernd; user; Stefan Richter; Till Rohrmann; Stephan Ewen
> Betreff: Re: number of files in checkpoint directory grows endlessly
> 
> Hi Bernd,
> 
> Did you change 'state.checkpoints.num-retained’ in flink-conf.yaml? By 
> default, only one checkpoint should be retained.
> 
> Which version of Flink do you use?
> Can you check Job Master logs whether you see there warning like this:
> `Fail to subsume the old checkpoint`?
> 
> Best,
> Andrey
> 
>> On 29 Nov 2018, at 10:18, Kostas Kloudas  wrote:
>> 
>> Hi Bernd,
>> 
>> I think the Till, Stefan or Stephan (cc'ed) are the best to answer your 
>> question.
>> 
>> Cheers,
>> Kostas
> 
> 
> 
> 
> Landesbank Hessen-Thueringen Girozentrale
> Anstalt des oeffentlichen Rechts
> Sitz: Frankfurt am Main / Erfurt
> Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA 102181
> 
> Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum 
> Informationsaustausch. Wir koennen auf diesem Wege keine 
> rechtsgeschaeftlichen Erklaerungen (Auftraege etc.) entgegennehmen.
> 
> Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen 
> Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte 
> ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so 
> bitten wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu 
> setzen.
> 
> Please use your E-mail connection with us exclusively for the exchange of 
> information. We do not accept legally binding declarations (orders, etc.) by 
> this means of communication.
> 
> The contents of this message is confidential and intended only for the 
> recipient indicated. Taking notice of this message or disclosure by third 
> parties is not
> permitted. In the event that this message is not intended for you, please 
> contact us via E-mail or phone.



Re: Memory does not be released after job cancellation

2018-11-29 Thread Kostas Kloudas
Hi Nastaran,

Can you specify what more information do you need?

>From the discussion that you posted:
1) If you have batch jobs, then Flink does its own memory management
(outside the heap, so it is not subject to JVM's GC)
and although when you cancel the job, you do not see the memory being
de-allocated,
this memory is available to other jobs and you do not have to worry
about de-allocating manually.
2) if you use streaming, then you should use one of the provided state
backends and they will do the memory management
for you (see [1] and [2]).

Cheers,
Kostas

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/state_backends.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/large_state_tuning.html

On Wed, Nov 28, 2018 at 7:11 AM Nastaran Motavali  wrote:

> Hi,
> I have a simple java application uses flink 1.6.2.
> When I run the jar file, I can see that the job consumes a part of the
> host's main memory. If I cancel the job, the consumed memory does not be
> released until I stop the whole cluster. How can I release the memory after
> cancellation?
> I have followed the conversation around this issue at the mailing list
> archive[1] but still need more explanations.
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Need-help-to-understand-memory-consumption-td23821.html#a23926
>
>
>
> Kind regards,
>
> Nastaran Motavalli
>
>
>
>


AW: number of files in checkpoint directory grows endlessly

2018-11-29 Thread Bernd.Winterstein
Hi
We use Flink 1..6.2. As for the checkpoint directory there is only one chk-xxx 
directory. Therefore if would expect only one checkpoint remains.
The value of 'state.checkpoints.num-retained’ is not set explicitly.

The problem is not the number of checkpoints but the number of files in the 
"shared" directory next to the chk-xxx directory.


-Ursprüngliche Nachricht-
Von: Andrey Zagrebin [mailto:and...@data-artisans.com]
Gesendet: Donnerstag, 29. November 2018 10:39
An: Kostas Kloudas
Cc: Winterstein, Bernd; user; Stefan Richter; Till Rohrmann; Stephan Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly

Hi Bernd,

Did you change 'state.checkpoints.num-retained’ in flink-conf.yaml? By default, 
only one checkpoint should be retained.

Which version of Flink do you use?
Can you check Job Master logs whether you see there warning like this:
`Fail to subsume the old checkpoint`?

Best,
Andrey

> On 29 Nov 2018, at 10:18, Kostas Kloudas  wrote:
>
> Hi Bernd,
>
> I think the Till, Stefan or Stephan (cc'ed) are the best to answer your 
> question.
>
> Cheers,
> Kostas




Landesbank Hessen-Thueringen Girozentrale
Anstalt des oeffentlichen Rechts
Sitz: Frankfurt am Main / Erfurt
Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA 102181

Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum 
Informationsaustausch. Wir koennen auf diesem Wege keine rechtsgeschaeftlichen 
Erklaerungen (Auftraege etc.) entgegennehmen.

Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen 
Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte 
ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so bitten 
wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu setzen.

Please use your E-mail connection with us exclusively for the exchange of 
information. We do not accept legally binding declarations (orders, etc.) by 
this means of communication.

The contents of this message is confidential and intended only for the 
recipient indicated. Taking notice of this message or disclosure by third 
parties is not
permitted. In the event that this message is not intended for you, please 
contact us via E-mail or phone.


Re: number of files in checkpoint directory grows endlessly

2018-11-29 Thread Andrey Zagrebin
Hi Bernd,

Did you change 'state.checkpoints.num-retained’ in flink-conf.yaml? By default, 
only one checkpoint should be retained.

Which version of Flink do you use?
Can you check Job Master logs whether you see there warning like this:
`Fail to subsume the old checkpoint`?

Best,
Andrey

> On 29 Nov 2018, at 10:18, Kostas Kloudas  wrote:
> 
> Hi Bernd,
> 
> I think the Till, Stefan or Stephan (cc'ed) are the best to answer your 
> question.
> 
> Cheers,
> Kostas



Re: SQL Query named operator exceeds 80 characters

2018-11-29 Thread Timo Walther

Unfortunetely, renaming of operators is not supported so far.

We are currently thinking about a way of having fine-grained control 
about properties of SQL operators but this is in an early design phase 
and might take a while.


Regards,
Timo

Am 29.11.18 um 10:32 schrieb Kostas Kloudas:

Hi,

I think that you cannot set it explicitly.

The reason that I would say that is because SQL query gets parsed 
through Calcite
and then get translated to a DataStream program through a process that 
is rather

opaque to the user.

That said, I also cc'ed Fabian and Timo who know more on the topic.

Cheers,
Kostas





Looking for relevant sources related to connecting Apache Flink and Edgent.

2018-11-29 Thread Felipe Gutierrez
Hi,

I am trying to design a little prototype with Flink and Apache Edgent (
http://edgent.apache.org/) and I would like some help on the direction for
it. I am running Flink at my laptop and Edgent on my Raspberry Pi with a
simple filter for a proximity sensor (
https://github.com/felipegutierrez/explore-rpi/blob/master/src/main/java/org/sense/edgent/app/UltrasonicEdgentApp.java
).

My idea is to push down the filter operator from Flink to the Raspberry Pi
which is running Apache Edgent. With this in mind, where do you guys advise
me to start?

I have some ideas to study...
1 - Try to get the list of operators that Flink is about to execute on the
JobManager. source:
https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html
2 - Implement a connector to Apache Edgent in order to exchange messages
between Flink and Edgent.

Do you guys think in another source that is interesting regarding my
prototype?

Thanks,
Felipe
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


Re: number of files in checkpoint directory grows endlessly

2018-11-29 Thread Kostas Kloudas
Hi Bernd,

I think the Till, Stefan or Stephan (cc'ed) are the best to answer your
question.

Cheers,
Kostas


Re: Stream in loop and not getting to sink (Parquet writer )

2018-11-29 Thread Kostas Kloudas
Hi again Avi,

In the first example that you posted (the one with the Kafka source), do
you call env.execute()?

Cheers,
Kostas

On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas 
wrote:

> Hi Avi,
>
> In the last snippet that you posted, you have not activated checkpoints.
>
> Checkpoints are needed for the StreamingFileSink to produce results,
> especially in the case of BulkWriters (like Parquet) where
> the part file is rolled upon reception of a checkpoint and the part is
> finalised (i.e. "committed") when the checkpoint gets completed
> successfully.
>
> Could you please enable checkpointing and make sure that the job runs long
> enough for at least some checkpoints to be completed?
>
> Thanks a lot,
> Kostas
>
> On Thu, Nov 29, 2018 at 7:03 AM Avi Levi  wrote:
>
>> Checkout this little App. you can see that the file is created but no
>> data is written. even for a single record
>>
>> import io.eels.component.parquet.ParquetWriterConfig
>> import org.apache.avro.Schema
>> import org.apache.avro.generic.{ GenericData, GenericRecord }
>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>> import org.apache.hadoop.fs.Path
>> import org.apache.parquet.avro.AvroParquetWriter
>> import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter }
>> import org.apache.parquet.hadoop.metadata.CompressionCodecName
>> import scala.io.Source
>> import org.apache.flink.streaming.api.scala._
>>
>> object Tester extends App {
>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>>   def now = System.currentTimeMillis()
>>   val path = new Path(s"test-$now.parquet")
>>   val schemaString = 
>> Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
>>   val schema: Schema = new Schema.Parser().parse(schemaString)
>>   val compressionCodecName = CompressionCodecName.SNAPPY
>>   val config = ParquetWriterConfig()
>>   val genericReocrd: GenericRecord = new GenericData.Record(schema)
>>   genericReocrd.put("name", "test_b")
>>   genericReocrd.put("code", "NoError")
>>   genericReocrd.put("ts", 100L)
>>   val stream = env.fromElements(genericReocrd)
>>   val writer: ParquetWriter[GenericRecord] = 
>> AvroParquetWriter.builder[GenericRecord](path)
>> .withSchema(schema)
>> .withCompressionCodec(compressionCodecName)
>> .withPageSize(config.pageSize)
>> .withRowGroupSize(config.blockSize)
>> .withDictionaryEncoding(config.enableDictionary)
>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>> .withValidation(config.validating)
>> .build()
>>
>>   writer.write(genericReocrd)
>>   stream.addSink { r =>
>> println(s"In Sink $r")
>> writer.write(r)
>>   }
>>   env.execute()
>>   //  writer.close()
>> }
>>
>>
>> On Thu, Nov 29, 2018 at 6:57 AM vipul singh  wrote:
>>
>>> Can you try closing the writer?
>>>
>>> AvroParquetWriter has an internal buffer. Try doing a .close() in
>>> snapshot()( since you are checkpointing hence this method will be called)
>>>
>>> On Wed, Nov 28, 2018 at 7:33 PM Avi Levi 
>>> wrote:
>>>
 Thanks Rafi,
 I am actually not using assignTimestampsAndWatermarks , I will try to
 add it as you suggested. however it seems that the messages I repeating in
 the stream over and over even if I am pushing single message manually to
 the queue, that message will repeat infinity

 Cheers
 Avi


 On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch 
 wrote:

> Hi Avi,
>
> I can't see the part where you use  assignTimestampsAndWatermarks.
> If this part in not set properly, it's possible that watermarks are
> not sent and nothing will be written to your Sink.
>
> See here for more details:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
>
> Hope this helps,
> Rafi
>
> On Wed, Nov 28, 2018, 21:22 Avi Levi 
>> Hi,
>>
>> I am trying to implement Parquet Writer as SinkFunction. The pipeline
>> consists of kafka as source and parquet file as a sink however it seems
>> like the stream is repeating itself like endless loop and the parquet 
>> file
>> is not written . can someone please help me with this?
>>
>> object ParquetSinkWriter{
>>   private val path = new Path("tmp/pfile")
>>   private val schemaString =
>> Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
>>   private val avroSchema: Schema = new
>> Schema.Parser().parse(schemaString)
>>   private val compressionCodecName = CompressionCodecName.SNAPPY
>>   private   val config = ParquetWriterConfig()
>>   val writer: ParquetWriter[GenericRecord] =
>> AvroParquetWriter.builder[GenericRecord](path)
>> .withSchema(avroSchema)
>> .withCompressionCodec(compressionCodecName)
>> .withPageSize(config.pageSize)
>> .withRowGroupSize(config.blockSize)
>> 

Re: Stream in loop and not getting to sink (Parquet writer )

2018-11-29 Thread Kostas Kloudas
Hi Avi,

In the last snippet that you posted, you have not activated checkpoints.

Checkpoints are needed for the StreamingFileSink to produce results,
especially in the case of BulkWriters (like Parquet) where
the part file is rolled upon reception of a checkpoint and the part is
finalised (i.e. "committed") when the checkpoint gets completed
successfully.

Could you please enable checkpointing and make sure that the job runs long
enough for at least some checkpoints to be completed?

Thanks a lot,
Kostas

On Thu, Nov 29, 2018 at 7:03 AM Avi Levi  wrote:

> Checkout this little App. you can see that the file is created but no data
> is written. even for a single record
>
> import io.eels.component.parquet.ParquetWriterConfig
> import org.apache.avro.Schema
> import org.apache.avro.generic.{ GenericData, GenericRecord }
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.hadoop.fs.Path
> import org.apache.parquet.avro.AvroParquetWriter
> import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter }
> import org.apache.parquet.hadoop.metadata.CompressionCodecName
> import scala.io.Source
> import org.apache.flink.streaming.api.scala._
>
> object Tester extends App {
>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>   def now = System.currentTimeMillis()
>   val path = new Path(s"test-$now.parquet")
>   val schemaString = 
> Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
>   val schema: Schema = new Schema.Parser().parse(schemaString)
>   val compressionCodecName = CompressionCodecName.SNAPPY
>   val config = ParquetWriterConfig()
>   val genericReocrd: GenericRecord = new GenericData.Record(schema)
>   genericReocrd.put("name", "test_b")
>   genericReocrd.put("code", "NoError")
>   genericReocrd.put("ts", 100L)
>   val stream = env.fromElements(genericReocrd)
>   val writer: ParquetWriter[GenericRecord] = 
> AvroParquetWriter.builder[GenericRecord](path)
> .withSchema(schema)
> .withCompressionCodec(compressionCodecName)
> .withPageSize(config.pageSize)
> .withRowGroupSize(config.blockSize)
> .withDictionaryEncoding(config.enableDictionary)
> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
> .withValidation(config.validating)
> .build()
>
>   writer.write(genericReocrd)
>   stream.addSink { r =>
> println(s"In Sink $r")
> writer.write(r)
>   }
>   env.execute()
>   //  writer.close()
> }
>
>
> On Thu, Nov 29, 2018 at 6:57 AM vipul singh  wrote:
>
>> Can you try closing the writer?
>>
>> AvroParquetWriter has an internal buffer. Try doing a .close() in
>> snapshot()( since you are checkpointing hence this method will be called)
>>
>> On Wed, Nov 28, 2018 at 7:33 PM Avi Levi  wrote:
>>
>>> Thanks Rafi,
>>> I am actually not using assignTimestampsAndWatermarks , I will try to
>>> add it as you suggested. however it seems that the messages I repeating in
>>> the stream over and over even if I am pushing single message manually to
>>> the queue, that message will repeat infinity
>>>
>>> Cheers
>>> Avi
>>>
>>>
>>> On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch 
>>> wrote:
>>>
 Hi Avi,

 I can't see the part where you use  assignTimestampsAndWatermarks.
 If this part in not set properly, it's possible that watermarks are not
 sent and nothing will be written to your Sink.

 See here for more details:
 https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission

 Hope this helps,
 Rafi

 On Wed, Nov 28, 2018, 21:22 Avi Levi >>>
> Hi,
>
> I am trying to implement Parquet Writer as SinkFunction. The pipeline
> consists of kafka as source and parquet file as a sink however it seems
> like the stream is repeating itself like endless loop and the parquet file
> is not written . can someone please help me with this?
>
> object ParquetSinkWriter{
>   private val path = new Path("tmp/pfile")
>   private val schemaString =
> Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
>   private val avroSchema: Schema = new
> Schema.Parser().parse(schemaString)
>   private val compressionCodecName = CompressionCodecName.SNAPPY
>   private   val config = ParquetWriterConfig()
>   val writer: ParquetWriter[GenericRecord] =
> AvroParquetWriter.builder[GenericRecord](path)
> .withSchema(avroSchema)
> .withCompressionCodec(compressionCodecName)
> .withPageSize(config.pageSize)
> .withRowGroupSize(config.blockSize)
> .withDictionaryEncoding(config.enableDictionary)
> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
> .withValidation(config.validating)
> .build()
> }
>
> class ParquetSinkWriter(path: Path, avroSchema: Schema) extends
> SinkFunction[GenericRecord] {
>   import ParquetSinkWriter._
>   override 

Re: Dataset column statistics

2018-11-29 Thread Fabian Hueske
Hi,

You could try to enable object reuse.
Alternatively you can give more heap memory or fine tune the GC parameters.

I would not consider it a bug in Flink, but might be something that could
be improved.

Fabian


Am Mi., 28. Nov. 2018 um 18:19 Uhr schrieb Flavio Pompermaier <
pomperma...@okkam.it>:

> Hi to all,
> I have a batch dataset  and I want to get some standard info about its
> columns (like min, max, avg etc).
> In order to achieve this I wrote a simple program that use SQL on table
> API like the following:
>
> SELECT
> MAX(col1), MIN(col1), AVG(col1),
> MAX(col2), MIN(col2), AVG(col2),
> MAX(col3), MIN(col3), AVG(col3)
> FROM MYTABLE
>
> In my dataset I have about 50 fields and the query becomes quite big (and
> the job plan too).
> It seems that this kind of job cause the cluster to crash (too much
> garbage collection).
> Is there any smarter way to achieve this goal (apart from running a job
> per column)?
> Is this "normal" or is this a bug of Flink?
>
> Best,
> Flavio
>


how to stop hung job manager

2018-11-29 Thread Ali, Kasif
Hello,

Is there any config which can be set to stop/kill the job manager if the client 
has crashed due to some exception/error?

We see this in our application where client has crashed due to some issue like 
making rest call or the application crash.

In such cases job manager continue to run expecting jobs to be submitted.  If 
the client is not sending any heart beat to JM, can we make it stop/kill if no 
heart beat is received for some time?

Thanks,
Kasif




Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices


number of files in checkpoint directory grows endlessly

2018-11-29 Thread Bernd.Winterstein
I have a flink job running with the following settings:
*   CheckpointingMode.EXACTLY_ONCE
*   RocksDB backend (Modified with TtlDB usage)
*   CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION
*   60 sec interval
*   Asnyc snapshots
*   Incremental checkpoints
*   Queryable State enabled

After a few days I have ~5000 files in the checkpoints shared directory. It 
grows by ~1000 files/day.

Now I have the following questions:
1.  What kind of information is stored in the shared subdirectory
2.  Is it safe to delete old files or do we have to restart the job from a 
savepoint

chk-9798:   139
shared: 4840
taskowned:  0

Regards

Bernd



  


Landesbank Hessen-Thueringen Girozentrale
Anstalt des oeffentlichen Rechts
Sitz: Frankfurt am Main / Erfurt
Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA 102181

Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum 
Informationsaustausch. Wir koennen auf diesem Wege keine rechtsgeschaeftlichen 
Erklaerungen (Auftraege etc.) entgegennehmen.

Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen 
Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte 
ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so bitten 
wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu setzen.

Please use your E-mail connection with us exclusively for the exchange of 
information. We do not accept legally binding declarations (orders, etc.) by 
this means of communication.

The contents of this message is confidential and intended only for the 
recipient indicated. Taking notice of this message or disclosure by third 
parties is not
permitted. In the event that this message is not intended for you, please 
contact us via E-mail or phone.