[jira] [Created] (FLINK-7618) Add BINARY supported in FlinkTypeFactory

2017-09-14 Thread sunjincheng (JIRA)
sunjincheng created FLINK-7618:
--

 Summary: Add BINARY supported in FlinkTypeFactory
 Key: FLINK-7618
 URL: https://issues.apache.org/jira/browse/FLINK-7618
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Reporter: sunjincheng
Assignee: sunjincheng


We will get the following exception when we deal with the BINARY type.

{code}
org.apache.flink.table.api.TableException: Type is not supported: BINARY

at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53)
at 
org.apache.flink.table.calcite.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:377)
at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:741)
at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:67)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:104)
at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754)
at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744)
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: got Warn message - "the expected leader session ID did not equal the received leader session ID " when using LocalFlinkMiniCluster to interpret scala code

2017-09-14 Thread Till Rohrmann
Hi XiangWei,

the problem is that the LocalFlinkMiniCluster can no longer be used in
combination with a RemoteExecutionEnvironment. The reason is that the
LocalFlinkMiniCluster uses now an internal leader election service and
assigns leader ids to its components. Since this is an internal service it
is not possible to retrieve this information like it is the case with the
ZooKeeper based leader election services.

Long story short, the Flink Scala shell currently does not work with a
LocalFlinkMiniCluster and would have to be fixed to work properly together
with a local execution environment. Until then, I recommend starting a
local standalone cluster and let the code run there.

Cheers,
Till
​

On Wed, Sep 13, 2017 at 6:21 AM, XiangWei Huang 
wrote:

> dear all,
>
> *Below is the code i execute:*
>
> import java.io._
> import java.net.{URL, URLClassLoader}
> import java.nio.charset.Charset
> import java.util.Collections
> import java.util.concurrent.atomic.AtomicBoolean
>
> import com.netease.atom.common.util.logging.Logging
> import com.netease.atom.interpreter.Code.Code
> import com.netease.atom.interpreter.{Code, Interpreter,
> InterpreterResult, InterpreterUtils}
> import io.netty.buffer._
> import org.apache.flink.api.scala.FlinkILoop
> import org.apache.flink.client.CliFrontend
> import org.apache.flink.client.cli.CliFrontendParser
> import org.apache.flink.client.program.ClusterClient
> import org.apache.flink.configuration.{QueryableStateOptions,
> Configuration, ConfigConstants, GlobalConfiguration}
> import org.apache.flink.runtime.akka.AkkaUtils
> import org.apache.flink.runtime.minicluster.{StandaloneMiniCluster,
> LocalFlinkMiniCluster}
>
> import scala.Console
> import scala.beans.BeanProperty
> import scala.collection.JavaConversions._
> import scala.collection.mutable
> import scala.collection.mutable.{ArrayBuffer, ListBuffer}
> import scala.runtime.AbstractFunction0
> import scala.tools.nsc.Settings
> import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results}
>
> class FlinkInterpreter extends Interpreter {
>   private var bufferedReader: Option[BufferedReader] = None
>   private var jprintWriter: JPrintWriter = _
>   private val config = new Configuration;
>   private var cluster: LocalFlinkMiniCluster = _
>   @BeanProperty var imain: IMain = _
>   @BeanProperty var flinkILoop: FlinkILoop = _
>   private var out: ByteBufOutputStream = null
>   private var outBuf: ByteBuf = null
>   private var in: ByteBufInputStream = _
>   private var isRunning: AtomicBoolean = new AtomicBoolean(false)
>
>   override def isOpen: Boolean = {
> isRunning.get()
>   }
>
>   def startLocalMiniCluster(): (String, Int, LocalFlinkMiniCluster) = {
> config.toMap.toMap.foreach(println)
> config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1)
> config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
> config.setInteger(ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, 1)
> config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
> config.setBoolean(QueryableStateOptions.SERVER_ENABLE.key(), true)
> config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
> val localCluster = new LocalFlinkMiniCluster(config, false)
> localCluster.start(true)
> val port = AkkaUtils.getAddress(localCluster.
> jobManagerActorSystems.get.head).port
> println(s"Starting local Flink cluster (host: localhost,port: ${
> localCluster.getLeaderRPCPort}).\n")
> ("localhost", localCluster.getLeaderRPCPort, localCluster)
>   }
>
>
>   /**
>* Start flink cluster and create interpreter
>*/
>   override def open: Unit = {
> outBuf = ByteBufAllocator.DEFAULT.heapBuffer(20480)
> out = new ByteBufOutputStream(outBuf)
> in = new ByteBufInputStream(outBuf)
> //val (host, port, yarnCluster) = 
> deployNewYarnCluster(YarnConfig(Option(1),
> None, None, None, Option(1), None))
> val (host, port, localCluster) = startLocalMiniCluster()
> this.cluster = localCluster
> val conf = cluster.configuration
> println(s"Connecting to Flink cluster (host:$host,port:$port)...")
> flinkILoop = new FlinkILoop(host, port, conf, None)
> val settings = new Settings()
> settings.usejavacp.value = true
> settings.Yreplsync.value = true
> flinkILoop.settings_$eq(settings)
> flinkILoop.createInterpreter()
> imain = flinkILoop.intp
> FlinkInterpreter.ourClassloader = imain.classLoader
> val benv = flinkILoop.scalaBenv
> val senv = flinkILoop.scalaSenv
> benv.getConfig.disableSysoutLogging()
> senv.getConfig.disableSysoutLogging()
> // import libraries
> imain.interpret("import scala.tools.nsc.io._")
> //imain.interpret("import Properties.userHome")
> imain.interpret("import scala.compat.Platform.EOL")
> imain.interpret("import org.apache.flink.api.scala._")
> imain.interpret("import org.apache.flink.api.common.functions._")
> isRunning.set(true)
>   }
>
>   override def i

[jira] [Created] (FLINK-7619) Improve abstraction in AbstractAsyncIOCallable to better fit

2017-09-14 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-7619:
-

 Summary: Improve abstraction in AbstractAsyncIOCallable to better 
fit
 Key: FLINK-7619
 URL: https://issues.apache.org/jira/browse/FLINK-7619
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Reporter: Stefan Richter
Assignee: Stefan Richter
Priority: Minor


The abstraction of AbstractAsyncIOCallable does not fit to well with todays 
needs in checkpointing backends. Originally, backends were assumed to only open 
one stream that is managed by the abstraction. In fact, concrete 
implementations always extended that in practise. We can redo this in a way 
that more resources can be managed by the abstraction.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7620) Supports custom optimization phase

2017-09-14 Thread godfrey he (JIRA)
godfrey he created FLINK-7620:
-

 Summary: Supports custom optimization phase
 Key: FLINK-7620
 URL: https://issues.apache.org/jira/browse/FLINK-7620
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
 Environment: Currently, the optimization phases are hardcode in 
{{BatchTableEnvironment}} and {{StreamTableEnvironment}}. It's better that user 
could define the optimization phases and the rules in each phase as needed.
Reporter: godfrey he
Assignee: godfrey he






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7621) Fix inconsistency of CaseSensitive Configuration

2017-09-14 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-7621:
-

 Summary: Fix inconsistency of CaseSensitive Configuration
 Key: FLINK-7621
 URL: https://issues.apache.org/jira/browse/FLINK-7621
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Reporter: Ruidong Li
Assignee: Ruidong Li


The default case sensitive config of Calcite is  {{LEX.java}} which is 
different from TableAPI



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7622) Respect local KPL queue size in FlinkKinesisProducer when adding records to KPL client

2017-09-14 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-7622:
--

 Summary: Respect local KPL queue size in FlinkKinesisProducer when 
adding records to KPL client
 Key: FLINK-7622
 URL: https://issues.apache.org/jira/browse/FLINK-7622
 Project: Flink
  Issue Type: Improvement
  Components: Kinesis Connector
Reporter: Tzu-Li (Gordon) Tai


This issue was brought to discussion by [~sthm] offline.

Currently, records are added to the Kinesis KPL producer client without 
checking the number of outstanding records within the local KPL queue. This 
manner is basically neglecting backpressure when producing to Kinesis through 
KPL, and can therefore exhaust system resources.

We should respect {{producer.getOutstandingRecordsCount()}} as a measure of 
backpressure, and propagate backpressure upstream by blocking further sink 
invocations when some threshold of outstanding record count is exceeded. The 
recommended threshold [1] seems to be 10,000.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-09-14 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-7623:
---

 Summary: Detecting whether an operator is restored doesn't work 
with chained state
 Key: FLINK-7623
 URL: https://issues.apache.org/jira/browse/FLINK-7623
 Project: Flink
  Issue Type: Bug
  Components: DataStream API, State Backends, Checkpointing
Affects Versions: 1.3.2, 1.4.0
Reporter: Aljoscha Krettek
Priority: Blocker
 Fix For: 1.4.0, 1.3.3


Originally reported on the ML: 
https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E

If we have a chain of operators where multiple of them have operator state, 
detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) 
does not work correctly. It's best exemplified using this minimal example where 
both the source and the flatMap have state:
{code}
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

env
.addSource(new MaSource()).uid("source-1")
.flatMap(new MaFlatMap()).uid("flatMap-1");

env.execute("testing");
{code}

If I do a savepoint with these UIDs, then change "source-1" to "source-2" and 
restore from the savepoint {{context.isRestored()}} still reports {{true}} for 
the source.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7624) Add kafka-topic for "KafkaProducer" metrics

2017-09-14 Thread Hai Zhou (JIRA)
Hai Zhou created FLINK-7624:
---

 Summary: Add kafka-topic for "KafkaProducer" metrics
 Key: FLINK-7624
 URL: https://issues.apache.org/jira/browse/FLINK-7624
 Project: Flink
  Issue Type: Bug
  Components: Metrics
Reporter: Hai Zhou
Assignee: Hai Zhou
 Fix For: 1.4.0


Currently, metric in "KafkaProducer" MetricGroup, Such as:
{code:java}
localhost.taskmanager.dc4092a96ea4e54ecdbd13b9a5c209b2.Flink Streaming 
Job.Sink--MTKafkaProducer08.0.KafkaProducer.record-queue-time-avg
{code}
The metric name in the "KafkaProducer" group does not have a kafka-topic name 
part,  if the job writes data to two different kafka sinks, these metrics will 
not distinguish.

I wish that modify the above metric name as follows:
{code:java}
localhost.taskmanager.dc4092a96ea4e54ecdbd13b9a5c209b2.Flink Streaming 
Job.Sink--MTKafkaProducer08.0.KafkaProducer..record-queue-time-avg
{code}
Best,
Hai Zhou



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7625) typo in docs metrics sections

2017-09-14 Thread Hai Zhou (JIRA)
Hai Zhou created FLINK-7625:
---

 Summary:  typo in docs metrics sections
 Key: FLINK-7625
 URL: https://issues.apache.org/jira/browse/FLINK-7625
 Project: Flink
  Issue Type: Bug
  Components: Documentation, Metrics
Affects Versions: 1.3.2
Reporter: Hai Zhou
Assignee: Hai Zhou
 Fix For: 1.4.0, 1.3.3


InfixMetrics
Status.JVM.Memory*Memory.Heap.Used*
changed to
Status.JVM.Memory *Heap.Used*



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: Question about Flink's savepoint

2017-09-14 Thread Aljoscha Krettek
Hi,

What is the source you're using in your Job and what filesystem (if any) is it 
writing to?

Best,
Aljoscha
> On 5. Sep 2017, at 03:06, Mu Kong  wrote:
> 
> Hi all,
> 
> I have some questions about the experience I had with the save point.
> So, last night I found my flink cluster's memory usage seemed wired, so I
> decided to
> 
> 1. create a savepoint for the running job(there was only one job running at
> the time)
> 2. and then cancel the job from web UI
> 3. and restart the cluster
> 
> and when I tried to resume the job with the savepoint, there was a
> "Truncate did not truncate to right length. Should be 11757 is 56383."
> exception.
> Because there is also a savepoint being created every 4 a.m. in the
> morning, so after I failed to run the job with the savepoint I created
> before I canceled the job, I tried to use the 4 a.m. savepoint instead, and
> it seemed to work well.
> 
> Then this morning, I noticed there is data lost for the time after I cancel
> the job and before I resume the job.
> 
> I thought if I run the job with savepoint created in 4 a.m., it should
> start to process data from 4 a.m., or I'm missing something here?
> 
> Also, I didn't add uid to the addSource() function, maybe when I restarted
> the cluster the auto-generated id has been changed and that might be the
> reason why the recovery didn't go well?



[jira] [Created] (FLINK-7626) Add some metric description about checkpoints

2017-09-14 Thread Hai Zhou (JIRA)
Hai Zhou created FLINK-7626:
---

 Summary: Add some metric description about checkpoints
 Key: FLINK-7626
 URL: https://issues.apache.org/jira/browse/FLINK-7626
 Project: Flink
  Issue Type: Bug
  Components: Documentation, Metrics
Affects Versions: 1.3.2
Reporter: Hai Zhou
Assignee: Hai Zhou
 Fix For: 1.4.0, 1.3.3


Add some metric description in"Debugging & Monitoring / Metrics"  part of  docs:

{noformat}

//Number of total checkpoints (in progress, completed, failed)
totalNumberOfCheckpoints

 //Number of in progress checkpoints.
numberOfInProgressCheckpoints

//Number of successfully completed checkpoints
numberOfCompletedCheckpoints

//Number of failed checkpoints.
numberOfFailedCheckpoints

//Timestamp when the checkpoint was restored at the coordinator.
lastCheckpointRestoreTimestamp

//Buffered bytes during alignment over all subtasks.
lastCheckpointAlignmentBuffered
{noformat}




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7627) SingleElementIterable should implement with Serializable

2017-09-14 Thread Hequn Cheng (JIRA)
Hequn Cheng created FLINK-7627:
--

 Summary: SingleElementIterable should implement with Serializable 
 Key: FLINK-7627
 URL: https://issues.apache.org/jira/browse/FLINK-7627
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Reporter: Hequn Cheng
Assignee: Hequn Cheng


{{SingleElementIterable}} is used to merge accumulators and it should be 
serializable considering that it will be serialized when doing checkpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)