[GitHub] flink issue #2369: [FLINK-4035] Add a streaming connector for Apache Kafka 0...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2369 Just found some minor issues that can be fixed when merging. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
[ https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15525284#comment-15525284 ] ASF GitHub Bot commented on FLINK-4035: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2369 Just found some minor issues that can be fixed when merging. > Bump Kafka producer in Kafka sink to Kafka 0.10.0.0 > --- > > Key: FLINK-4035 > URL: https://issues.apache.org/jira/browse/FLINK-4035 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Elias Levy >Assignee: Robert Metzger >Priority: Minor > > Kafka 0.10.0.0 introduced protocol changes related to the producer. > Published messages now include timestamps and compressed messages now include > relative offsets. As it is now, brokers must decompress publisher compressed > messages, assign offset to them, and recompress them, which is wasteful and > makes it less likely that compression will be used at all. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
[ https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15525278#comment-15525278 ] ASF GitHub Bot commented on FLINK-4035: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2369#discussion_r80629176 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java --- @@ -102,11 +102,11 @@ * @param serializationSchema User defined (keyless) serialization schema. * @param producerConfig Properties with the producer configuration. */ - public static FlinkKafkaProducer010Configuration writeToKafka(DataStream inStream, - String topicId, - SerializationSchema serializationSchema, - Properties producerConfig) { - return writeToKafka(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner()); + public static FlinkKafkaProducer010Configuration writeToKafkaWithTimestamps(DataStream inStream, --- End diff -- Add the generic type parameter `T` to `FlinkKafkaProducer010Configuration` here too? > Bump Kafka producer in Kafka sink to Kafka 0.10.0.0 > --- > > Key: FLINK-4035 > URL: https://issues.apache.org/jira/browse/FLINK-4035 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Elias Levy >Assignee: Robert Metzger >Priority: Minor > > Kafka 0.10.0.0 introduced protocol changes related to the producer. > Published messages now include timestamps and compressed messages now include > relative offsets. As it is now, brokers must decompress publisher compressed > messages, assign offset to them, and recompress them, which is wasteful and > makes it less likely that compression will be used at all. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2369: [FLINK-4035] Add a streaming connector for Apache ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2369#discussion_r80629075 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java --- @@ -83,11 +83,11 @@ * @param serializationSchema User defined serialization schema supporting key/value messages * @param producerConfig Properties with the producer configuration. */ - public static FlinkKafkaProducer010Configuration writeToKafka(DataStream inStream, - String topicId, - KeyedSerializationSchema serializationSchema, - Properties producerConfig) { - return writeToKafka(inStream, topicId, serializationSchema, producerConfig, new FixedPartitioner()); + public static FlinkKafkaProducer010Configuration writeToKafkaWithTimestamps(DataStream inStream, --- End diff -- Add the generic type parameter `T` to `FlinkKafkaProducer010Configuration` here too? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
[ https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15525277#comment-15525277 ] ASF GitHub Bot commented on FLINK-4035: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2369#discussion_r80629075 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java --- @@ -83,11 +83,11 @@ * @param serializationSchema User defined serialization schema supporting key/value messages * @param producerConfig Properties with the producer configuration. */ - public static FlinkKafkaProducer010Configuration writeToKafka(DataStream inStream, - String topicId, - KeyedSerializationSchema serializationSchema, - Properties producerConfig) { - return writeToKafka(inStream, topicId, serializationSchema, producerConfig, new FixedPartitioner()); + public static FlinkKafkaProducer010Configuration writeToKafkaWithTimestamps(DataStream inStream, --- End diff -- Add the generic type parameter `T` to `FlinkKafkaProducer010Configuration` here too? > Bump Kafka producer in Kafka sink to Kafka 0.10.0.0 > --- > > Key: FLINK-4035 > URL: https://issues.apache.org/jira/browse/FLINK-4035 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Elias Levy >Assignee: Robert Metzger >Priority: Minor > > Kafka 0.10.0.0 introduced protocol changes related to the producer. > Published messages now include timestamps and compressed messages now include > relative offsets. As it is now, brokers must decompress publisher compressed > messages, assign offset to them, and recompress them, which is wasteful and > makes it less likely that compression will be used at all. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2369: [FLINK-4035] Add a streaming connector for Apache ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2369#discussion_r80629176 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java --- @@ -102,11 +102,11 @@ * @param serializationSchema User defined (keyless) serialization schema. * @param producerConfig Properties with the producer configuration. */ - public static FlinkKafkaProducer010Configuration writeToKafka(DataStream inStream, - String topicId, - SerializationSchema serializationSchema, - Properties producerConfig) { - return writeToKafka(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner()); + public static FlinkKafkaProducer010Configuration writeToKafkaWithTimestamps(DataStream inStream, --- End diff -- Add the generic type parameter `T` to `FlinkKafkaProducer010Configuration` here too? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2369: [FLINK-4035] Add a streaming connector for Apache ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2369#discussion_r80628475 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java --- @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.functions.IterationRuntimeContext; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.apache.kafka.clients.producer.ProducerRecord; + +import java.util.Properties; + +import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList; + + +/** + * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.10.x + * + * Implementation note: This producer is a hybrid between a regular regular sink function (a) + * and a custom operator (b). + * + * For (a), the class implements the SinkFunction and RichFunction interfaces. + * For (b), it extends the StreamTask class. + * + * Details about approach (a): + * + * Pre Kafka 0.10 producers only follow approach (a), allowing users to use the producer using the + * DataStream.addSink() method. + * Since the APIs exposed in that variant do not allow accessing the the timestamp attached to the record + * the Kafka 0.10 producer has a second invocation option, approach (b). + * + * Details about approach (b): + * Kafka 0.10 supports writing the timestamp attached to a record to Kafka. When adding the + * FlinkKafkaProducer010 using the FlinkKafkaProducer010.writeToKafkaWithTimestamps() method, the Kafka producer + * can access the internal record timestamp of the record and write it to Kafka. + * + * All methods and constructors in this class are marked with the approach they are needed for. + */ +public class FlinkKafkaProducer010 extends StreamSink implements SinkFunction, RichFunction { + + /** +* Flag controlling whether we are writing the Flink record's timestamp into Kafka. +*/ + private boolean writeTimestampToKafka = false; + + // -- "Constructors" for timestamp writing -- + + /** +* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to +* the topic. +* +* This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) +* +* @param inStream The stream to write to Kafka +* @param topicId ID of the Kafka topic. +* @param serializationSchema User defined serialization schema supporting key/value messages +* @param producerConfig Properties with the producer configuration. +*/ + public static FlinkKafkaProducer010Configuration writeToKafkaWithTimestamps(DataStream inStream, +
[jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
[ https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15525262#comment-15525262 ] ASF GitHub Bot commented on FLINK-4035: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2369#discussion_r80628475 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java --- @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.functions.IterationRuntimeContext; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.apache.kafka.clients.producer.ProducerRecord; + +import java.util.Properties; + +import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList; + + +/** + * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.10.x + * + * Implementation note: This producer is a hybrid between a regular regular sink function (a) + * and a custom operator (b). + * + * For (a), the class implements the SinkFunction and RichFunction interfaces. + * For (b), it extends the StreamTask class. + * + * Details about approach (a): + * + * Pre Kafka 0.10 producers only follow approach (a), allowing users to use the producer using the + * DataStream.addSink() method. + * Since the APIs exposed in that variant do not allow accessing the the timestamp attached to the record + * the Kafka 0.10 producer has a second invocation option, approach (b). + * + * Details about approach (b): + * Kafka 0.10 supports writing the timestamp attached to a record to Kafka. When adding the + * FlinkKafkaProducer010 using the FlinkKafkaProducer010.writeToKafkaWithTimestamps() method, the Kafka producer + * can access the internal record timestamp of the record and write it to Kafka. + * + * All methods and constructors in this class are marked with the approach they are needed for. + */ +public class FlinkKafkaProducer010 extends StreamSink implements SinkFunction, RichFunction { + + /** +* Flag controlling whether we are writing the Flink record's timestamp into Kafka. +*/ + private boolean writeTimestampToKafka = false; + + // -- "Constructors" for timestamp writing -- + + /** +* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to +* the topic. +* +* This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) +* +* @param inStream The stream to write to Kafka +* @param topicId ID of the Kafka topic. +* @param serializationSchema User defined serializatio
[GitHub] flink issue #2517: [FLINK-4564] [metrics] Delimiter should be configured per...
Github user ex00 commented on the issue: https://github.com/apache/flink/pull/2517 zentol, thanks! I am pushed edited implementation again. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4564) [metrics] Delimiter should be configured per reporter
[ https://issues.apache.org/jira/browse/FLINK-4564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15525242#comment-15525242 ] ASF GitHub Bot commented on FLINK-4564: --- Github user ex00 commented on the issue: https://github.com/apache/flink/pull/2517 zentol, thanks! I am pushed edited implementation again. > [metrics] Delimiter should be configured per reporter > - > > Key: FLINK-4564 > URL: https://issues.apache.org/jira/browse/FLINK-4564 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Anton Mushin > > Currently, the delimiter used or the scope string is based on a configuration > setting shared by all reporters. However, different reporters may have > different requirements in regards to the delimiter, as such we should allow > reporters to use a different delimiter. > We can keep the current setting as a global setting that is used if no > specific setting was set. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4339) Implement Slot Pool Core
[ https://issues.apache.org/jira/browse/FLINK-4339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15525190#comment-15525190 ] ASF GitHub Bot commented on FLINK-4339: --- Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/2497 rebased to the latest flip-6 and use flink's own Future instead of scala's Future > Implement Slot Pool Core > > > Key: FLINK-4339 > URL: https://issues.apache.org/jira/browse/FLINK-4339 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Affects Versions: 1.1.0 >Reporter: Stephan Ewen >Assignee: Kurt Young > Fix For: 1.2.0 > > > Impements the core slot structures and behavior of the {{SlotPool}}: > - pool of available slots > - request slots and response if slot is available in pool > - return / deallocate slots > Detail design in here: > https://docs.google.com/document/d/1y4D-0KGiMNDFYOLRkJy-C04nl8fwJNdm9hoUfxce6zY/ -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2497: [FLINK-4339][cluster management] Implement Slot Pool core...
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/2497 rebased to the latest flip-6 and use flink's own Future instead of scala's Future --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4606) Integrate the new ResourceManager with the existing FlinkResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15525181#comment-15525181 ] ASF GitHub Bot commented on FLINK-4606: --- Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/2540 @mxm , thanks for your review, I modified the pr based on your advices: 1. fIx checkstyle error, `AkkaRpcActorTest` testcase and `RpcCompletenessTest` testcase. Sorry for those mistakes, I would take care of it next time. 2. About resourceManager, I adopt ResourceManager extends RpcEndpoint at first time, but it would fail because of an Exception when I wanna to start a subClass of this ResourceManager. For example, public class StandaloneResourceManager extends ResourceManager, when I start this ResourceManager, it would call AkkaRpcService.#startServer, an exception would be thrown here because selfGatewayType was mistake for TaskExecutorRegistration class. So I change it to ResourceManager extends RpcEndpoint > Integrate the new ResourceManager with the existing FlinkResourceManager > > > Key: FLINK-4606 > URL: https://issues.apache.org/jira/browse/FLINK-4606 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: zhangjing >Assignee: zhangjing > > Integrate the new ResourceManager with the existing FlinkResourceManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2540: [FLINK-4606] [cluster management] Integrate the new Resou...
Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/2540 @mxm , thanks for your review, I modified the pr based on your advices: 1. fIx checkstyle error, `AkkaRpcActorTest` testcase and `RpcCompletenessTest` testcase. Sorry for those mistakes, I would take care of it next time. 2. About resourceManager, I adopt ResourceManager extends RpcEndpoint at first time, but it would fail because of an Exception when I wanna to start a subClass of this ResourceManager. For example, public class StandaloneResourceManager extends ResourceManager, when I start this ResourceManager, it would call AkkaRpcService.#startServer, an exception would be thrown here because selfGatewayType was mistake for TaskExecutorRegistration class. So I change it to ResourceManager extends RpcEndpoint --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink
[ https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15525179#comment-15525179 ] ASF GitHub Bot commented on FLINK-2055: --- Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2332 Ok. Then it should be clearly documented now. that the sink supports only Puts/Deletes. So in future can the sink be updated with new APIs? I don't know the procedure here in case new APIs have to be added in the sink. Since the sink is not part of the FLINK distro it can be enhanced or modified. Just wanted to know. May be as a first step make things simple ->Support only Puts/ Delets -> You want order to be guaranteed then go with RowMutations. -> clearly document what the sink does now. -> Pls verify if it is ok to update the sink to support other APIs in future. Anyway this will give you atleast-once guarentee. > Implement Streaming HBaseSink > - > > Key: FLINK-2055 > URL: https://issues.apache.org/jira/browse/FLINK-2055 > Project: Flink > Issue Type: New Feature > Components: Streaming, Streaming Connectors >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Erli Ding > > As per : > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2332 Ok. Then it should be clearly documented now. that the sink supports only Puts/Deletes. So in future can the sink be updated with new APIs? I don't know the procedure here in case new APIs have to be added in the sink. Since the sink is not part of the FLINK distro it can be enhanced or modified. Just wanted to know. May be as a first step make things simple ->Support only Puts/ Delets -> You want order to be guaranteed then go with RowMutations. -> clearly document what the sink does now. -> Pls verify if it is ok to update the sink to support other APIs in future. Anyway this will give you atleast-once guarentee. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink
[ https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15525163#comment-15525163 ] ASF GitHub Bot commented on FLINK-2055: --- Github user delding commented on the issue: https://github.com/apache/flink/pull/2332 I agree it might be an overkill. But in case of having an sink that only supports Put/Delete, it would be better to have ordered execution than not, after all HBase has this API so there could be some use case that needs order guarantee. > Implement Streaming HBaseSink > - > > Key: FLINK-2055 > URL: https://issues.apache.org/jira/browse/FLINK-2055 > Project: Flink > Issue Type: New Feature > Components: Streaming, Streaming Connectors >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Erli Ding > > As per : > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user delding commented on the issue: https://github.com/apache/flink/pull/2332 I agree it might be an overkill. But in case of having an sink that only supports Put/Delete, it would be better to have ordered execution than not, after all HBase has this API so there could be some use case that needs order guarantee. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink
[ https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15525128#comment-15525128 ] ASF GitHub Bot commented on FLINK-2055: --- Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2332 With in a single row you need guarantee of order of execution? I agree Append/Increment or non-idempotent in certain failure cases but there is a Nonce generator for that. I should say I have not used that anyway to know the exact pros and cons of that. RowMutations atleast it supports only Put/Delete so in that angle you can be sure that for now we don't support Append/Increments. > we might need WriteAheadSink and figure out a way to roll back table to the last checkpoint state This will be tricky. Your hbase table rollback would mean that you may have to issue Deletes here so that the previous mutations are hidden. > Implement Streaming HBaseSink > - > > Key: FLINK-2055 > URL: https://issues.apache.org/jira/browse/FLINK-2055 > Project: Flink > Issue Type: New Feature > Components: Streaming, Streaming Connectors >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Erli Ding > > As per : > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2332 With in a single row you need guarantee of order of execution? I agree Append/Increment or non-idempotent in certain failure cases but there is a Nonce generator for that. I should say I have not used that anyway to know the exact pros and cons of that. RowMutations atleast it supports only Put/Delete so in that angle you can be sure that for now we don't support Append/Increments. > we might need WriteAheadSink and figure out a way to roll back table to the last checkpoint state This will be tricky. Your hbase table rollback would mean that you may have to issue Deletes here so that the previous mutations are hidden. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink
[ https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15525108#comment-15525108 ] ASF GitHub Bot commented on FLINK-2055: --- Github user delding commented on the issue: https://github.com/apache/flink/pull/2332 Hi @ramkrish86 , I'm thinking replace batch() with mutateRow() because it provides atomic ordered mutations for a single row, but it only supports Put and Delete which should be fine since only Put and Delete are idempotent, this way we can implement Put and Delete without using WriteAheadSink (in case of deterministic processing). What do you think? Regarding Append and Delete, as HBase doesn't support distributed transaction across multiple rows, we might need WriteAheadSink and figure out a way to roll back table to the last checkpoint state. I'm thinking about this right now. So it might make sense to have two HBaseSinks, one for Put/Delet, the other for Append/Delete and non-deterministic processing. > Implement Streaming HBaseSink > - > > Key: FLINK-2055 > URL: https://issues.apache.org/jira/browse/FLINK-2055 > Project: Flink > Issue Type: New Feature > Components: Streaming, Streaming Connectors >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Erli Ding > > As per : > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user delding commented on the issue: https://github.com/apache/flink/pull/2332 Hi @ramkrish86 , I'm thinking replace batch() with mutateRow() because it provides atomic ordered mutations for a single row, but it only supports Put and Delete which should be fine since only Put and Delete are idempotent, this way we can implement Put and Delete without using WriteAheadSink (in case of deterministic processing). What do you think? Regarding Append and Delete, as HBase doesn't support distributed transaction across multiple rows, we might need WriteAheadSink and figure out a way to roll back table to the last checkpoint state. I'm thinking about this right now. So it might make sense to have two HBaseSinks, one for Put/Delet, the other for Append/Delete and non-deterministic processing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink
[ https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15525031#comment-15525031 ] ASF GitHub Bot commented on FLINK-2055: --- Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2332 @delding Do you have uses cases with Append/Increment? I think with the batch() API we are not sure of the order of execution of the batch() in the hbase server but still it would help us to achieve the atleast-once guarantee as @zentol mentioned here. > Implement Streaming HBaseSink > - > > Key: FLINK-2055 > URL: https://issues.apache.org/jira/browse/FLINK-2055 > Project: Flink > Issue Type: New Feature > Components: Streaming, Streaming Connectors >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Erli Ding > > As per : > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2332 @delding Do you have uses cases with Append/Increment? I think with the batch() API we are not sure of the order of execution of the batch() in the hbase server but still it would help us to achieve the atleast-once guarantee as @zentol mentioned here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4690) Replace SlotAllocationFuture with flink's own future
[ https://issues.apache.org/jira/browse/FLINK-4690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15525008#comment-15525008 ] ASF GitHub Bot commented on FLINK-4690: --- GitHub user KurtYoung opened a pull request: https://github.com/apache/flink/pull/2552 [FLINK-4690] Replace SlotAllocationFuture with flink's own future You can merge this pull request into a Git repository by running: $ git pull https://github.com/KurtYoung/flink flink-4690 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2552.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2552 commit f2303f94b5cc18086f5bf34566079cc86a5187e9 Author: Kurt Young Date: 2016-09-27T04:10:08Z [FLINK-4690] Replace SlotAllocationFuture with flink's own future > Replace SlotAllocationFuture with flink's own future > > > Key: FLINK-4690 > URL: https://issues.apache.org/jira/browse/FLINK-4690 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Kurt Young >Assignee: Kurt Young > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2552: [FLINK-4690] Replace SlotAllocationFuture with fli...
GitHub user KurtYoung opened a pull request: https://github.com/apache/flink/pull/2552 [FLINK-4690] Replace SlotAllocationFuture with flink's own future You can merge this pull request into a Git repository by running: $ git pull https://github.com/KurtYoung/flink flink-4690 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2552.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2552 commit f2303f94b5cc18086f5bf34566079cc86a5187e9 Author: Kurt Young Date: 2016-09-27T04:10:08Z [FLINK-4690] Replace SlotAllocationFuture with flink's own future --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4690) Replace SlotAllocationFuture with flink's own future
Kurt Young created FLINK-4690: - Summary: Replace SlotAllocationFuture with flink's own future Key: FLINK-4690 URL: https://issues.apache.org/jira/browse/FLINK-4690 Project: Flink Issue Type: Sub-task Reporter: Kurt Young Assignee: Kurt Young -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
[ https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15524939#comment-15524939 ] ASF GitHub Bot commented on FLINK-4035: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2369 @rmetzger Thanks for addressing the comments! Did a final pass, and the changes look good to me. I agree with merging the connector as is. Adding the timestamp to the regular sink interface seems like a good long term solution. +1 to merge once travis turns green ;) > Bump Kafka producer in Kafka sink to Kafka 0.10.0.0 > --- > > Key: FLINK-4035 > URL: https://issues.apache.org/jira/browse/FLINK-4035 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Elias Levy >Assignee: Robert Metzger >Priority: Minor > > Kafka 0.10.0.0 introduced protocol changes related to the producer. > Published messages now include timestamps and compressed messages now include > relative offsets. As it is now, brokers must decompress publisher compressed > messages, assign offset to them, and recompress them, which is wasteful and > makes it less likely that compression will be used at all. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2369: [FLINK-4035] Add a streaming connector for Apache Kafka 0...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2369 @rmetzger Thanks for addressing the comments! Did a final pass, and the changes look good to me. I agree with merging the connector as is. Adding the timestamp to the regular sink interface seems like a good long term solution. +1 to merge once travis turns green ;) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4606) Integrate the new ResourceManager with the existing FlinkResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15524850#comment-15524850 ] ASF GitHub Bot commented on FLINK-4606: --- Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2540#discussion_r80612002 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -66,15 +67,16 @@ * {@link #requestSlot(SlotRequest)} requests a slot from the resource manager * */ -public class ResourceManager extends RpcEndpoint implements LeaderContender { +public abstract class ResourceManager extends RpcEndpoint implements LeaderContender { --- End diff -- @mxm , I adopt `ResourceManager extends RpcEndpoint ` at first time, but it would fail because of an Exception when I wanna to start a subClass of this ResourceManager. For example, `public class StandaloneResourceManager extends ResourceManager`, when I start this ResourceManager, it would call AkkaRpcService.#startServer, an exception would be thrown here because selfGatewayType was mistake for TaskExecutorRegistration class. So I change it to `ResourceManager extends RpcEndpoint` > Integrate the new ResourceManager with the existing FlinkResourceManager > > > Key: FLINK-4606 > URL: https://issues.apache.org/jira/browse/FLINK-4606 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: zhangjing >Assignee: zhangjing > > Integrate the new ResourceManager with the existing FlinkResourceManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2540: [FLINK-4606] [cluster management] Integrate the ne...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2540#discussion_r80612002 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -66,15 +67,16 @@ * {@link #requestSlot(SlotRequest)} requests a slot from the resource manager * */ -public class ResourceManager extends RpcEndpoint implements LeaderContender { +public abstract class ResourceManager extends RpcEndpoint implements LeaderContender { --- End diff -- @mxm , I adopt `ResourceManager extends RpcEndpoint ` at first time, but it would fail because of an Exception when I wanna to start a subClass of this ResourceManager. For example, `public class StandaloneResourceManager extends ResourceManager`, when I start this ResourceManager, it would call AkkaRpcService.#startServer, an exception would be thrown here because selfGatewayType was mistake for TaskExecutorRegistration class. So I change it to `ResourceManager extends RpcEndpoint` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4653) Refactor JobClientActor to adapt to the new Rpc framework and new cluster managerment
[ https://issues.apache.org/jira/browse/FLINK-4653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15524806#comment-15524806 ] ASF GitHub Bot commented on FLINK-4653: --- Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/2524 @mxm , thanks for your review. I modified the content based on your advice: 1. The changes could compile successfully now, sorry for the level mistake. 2. AwaitJobResult method in JobClientUtils would retry upon TimeoutException utils JobInfoTracker seems to be dead. > Refactor JobClientActor to adapt to the new Rpc framework and new cluster > managerment > - > > Key: FLINK-4653 > URL: https://issues.apache.org/jira/browse/FLINK-4653 > Project: Flink > Issue Type: Sub-task > Components: Client >Reporter: zhangjing >Assignee: zhangjing > Fix For: 1.2.0 > > > 1. Create a RpcEndpoint(temporary named JobInfoTracker) and > RpcGateway(temporary named JobInfoTrackerGateway) to replace the old > JobClientActor. > 2. Change rpc message communication in JobClientActor to rpc method call to > apply to the new rpc framework. > 3. JobInfoTracker is responsible for waiting for the jobStateChange and > jobResult util job complete. But it is not responsible for submitting new job > because jobSubmission behavior is different in different cluster -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2524: [FLINK-4653] [Client] Refactor JobClientActor to adapt to...
Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/2524 @mxm , thanks for your review. I modified the content based on your advice: 1. The changes could compile successfully now, sorry for the level mistake. 2. AwaitJobResult method in JobClientUtils would retry upon TimeoutException utils JobInfoTracker seems to be dead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4689) Implement a simple slot provider for the new job manager
Kurt Young created FLINK-4689: - Summary: Implement a simple slot provider for the new job manager Key: FLINK-4689 URL: https://issues.apache.org/jira/browse/FLINK-4689 Project: Flink Issue Type: Sub-task Reporter: Kurt Young Assignee: Kurt Young In flip-6 branch, we need to adjust existing scheduling model. In the first step, we should introduce a simple / naive slot provider which just ignore all the sharing or location constraint, to make whole thing work. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4408) Submit Job and setup ExecutionGraph
[ https://issues.apache.org/jira/browse/FLINK-4408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young resolved FLINK-4408. --- Resolution: Fixed > Submit Job and setup ExecutionGraph > --- > > Key: FLINK-4408 > URL: https://issues.apache.org/jira/browse/FLINK-4408 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Xiaogang Shi >Assignee: Kurt Young > > Once granted the leadership, JM will start to execute the job. > Most code remains the same except that > (1) In old implementation where JM manages the execution of multiple jobs, JM > has to load all submitted JobGraphs from SubmittedJobGraphStore and recover > them. Now that the components creating JM will be responsible for the > recovery of JobGraphs, JM will be created with submitted/recovered JobGraph, > without the need to load the JobGraph. > (2) JM should not rely on Akka to listen on the updates of JobStatus and > Execution. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4408) Submit Job and setup ExecutionGraph
[ https://issues.apache.org/jira/browse/FLINK-4408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15524664#comment-15524664 ] ASF GitHub Bot commented on FLINK-4408: --- Github user KurtYoung closed the pull request at: https://github.com/apache/flink/pull/2480 > Submit Job and setup ExecutionGraph > --- > > Key: FLINK-4408 > URL: https://issues.apache.org/jira/browse/FLINK-4408 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Xiaogang Shi >Assignee: Kurt Young > > Once granted the leadership, JM will start to execute the job. > Most code remains the same except that > (1) In old implementation where JM manages the execution of multiple jobs, JM > has to load all submitted JobGraphs from SubmittedJobGraphStore and recover > them. Now that the components creating JM will be responsible for the > recovery of JobGraphs, JM will be created with submitted/recovered JobGraph, > without the need to load the JobGraph. > (2) JM should not rely on Akka to listen on the updates of JobStatus and > Execution. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2480: [FLINK-4408][JobManager] Introduce JobMasterRunner...
Github user KurtYoung closed the pull request at: https://github.com/apache/flink/pull/2480 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker
[ https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15524101#comment-15524101 ] ASF GitHub Bot commented on FLINK-4280: --- Github user gyfora commented on the issue: https://github.com/apache/flink/pull/2509 @tzulitai makes sense ! As for for the Map you are right, the multiple topic case slipped my mind :) > New Flink-specific option to set starting position of Kafka consumer without > respecting external offsets in ZK / Broker > --- > > Key: FLINK-4280 > URL: https://issues.apache.org/jira/browse/FLINK-4280 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0 > > > Currently, to start reading from the "earliest" and "latest" position in > topics for the Flink Kafka consumer, users set the Kafka config > {{auto.offset.reset}} in the provided properties configuration. > However, the way this config actually works might be a bit misleading if > users were trying to find a way to "read topics from a starting position". > The way the {{auto.offset.reset}} config works in the Flink Kafka consumer > resembles Kafka's original intent for the setting: first, existing external > offsets committed to the ZK / brokers will be checked; if none exists, then > will {{auto.offset.reset}} be respected. > I propose to add Flink-specific ways to define the starting position, without > taking into account the external offsets. The original behaviour (reference > external offsets first) can be changed to be a user option, so that the > behaviour can be retained for frequent Kafka users that may need some > collaboration with existing non-Flink Kafka consumer applications. > How users will interact with the Flink Kafka consumer after this is added, > with a newly introduced {{flink.starting-position}} config: > {code} > Properties props = new Properties(); > props.setProperty("flink.starting-position", "earliest/latest"); > props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a > warning) > props.setProperty("group.id", "...") // this won't have effect on the > starting position anymore (may still be used in external offset committing) > ... > {code} > Or, reference external offsets in ZK / broker: > {code} > Properties props = new Properties(); > props.setProperty("flink.starting-position", "external-offsets"); > props.setProperty("auto.offset.reset", "earliest/latest"); // default will be > latest > props.setProperty("group.id", "..."); // will be used to lookup external > offsets in ZK / broker on startup > ... > {code} > A thing we would need to decide on is what would the default value be for > {{flink.starting-position}}. > Two merits I see in adding this: > 1. This compensates the way users generally interpret "read from a starting > position". As the Flink Kafka connector is somewhat essentially a > "high-level" Kafka consumer for Flink users, I think it is reasonable to add > Flink-specific functionality that users will find useful, although it wasn't > supported in Kafka's original consumer designs. > 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is > used only to expose progress to the outside world, and not used to manipulate > how Kafka topics are read in Flink (unless users opt to do so)" is even more > definite and solid. There was some discussion in this PR > (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I > think adding this "decouples" more Flink's internal offset checkpointing from > the external Kafka's offset store. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2509: [FLINK-4280][kafka-connector] Explicit start position con...
Github user gyfora commented on the issue: https://github.com/apache/flink/pull/2509 @tzulitai makes sense ! As for for the Map you are right, the multiple topic case slipped my mind :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-598) Add support for globalOrdering in DataSet API
[ https://issues.apache.org/jira/browse/FLINK-598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neelesh Srinivas Salian closed FLINK-598. - Resolution: Fixed Closing since the sub-tasks are Closed and Resolved. > Add support for globalOrdering in DataSet API > - > > Key: FLINK-598 > URL: https://issues.apache.org/jira/browse/FLINK-598 > Project: Flink > Issue Type: Improvement >Reporter: GitHub Import > Labels: github-import > > There is no support for globalOrdering at the moment. In the Record API, it > was possible to hand an Ordering and a Distribution to a FileDataSink. In the > DataSet API, such a feature is still missing. > Imported from GitHub > Url: https://github.com/stratosphere/stratosphere/issues/598 > Created by: [skunert|https://github.com/skunert] > Labels: enhancement, java api, user satisfaction, > Assignee: [fhueske|https://github.com/fhueske] > Created at: Mon Mar 17 14:08:05 CET 2014 > State: open -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4439) Error message KafkaConsumer08 when all 'bootstrap.servers' are invalid
[ https://issues.apache.org/jira/browse/FLINK-4439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15523891#comment-15523891 ] ASF GitHub Bot commented on FLINK-4439: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2397 I've rebased to current master and triggered a build. https://travis-ci.org/rmetzger/flink/builds/162871029 > Error message KafkaConsumer08 when all 'bootstrap.servers' are invalid > -- > > Key: FLINK-4439 > URL: https://issues.apache.org/jira/browse/FLINK-4439 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.0.3 >Reporter: Gheorghe Gheorghe >Priority: Minor > > The "flink-connector-kafka-0.8_2" is logging the following error when all > 'bootstrap.servers' are invalid when passed to the FlinkKafkaConsumer08. > See stacktrace: > {code:title=stacktrace|borderStyle=solid} > 2016-08-21 15:22:30 WARN FlinkKafkaConsumerBase:290 - Error communicating > with broker inexistentKafkHost:9092 to find partitions for [testTopic].class > java.nio.channels.ClosedChannelException. Message: null > 2016-08-21 15:22:30 DEBUG FlinkKafkaConsumerBase:292 - Detailed trace > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) > at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) > at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:91) > at kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:68) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:264) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:193) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:164) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:131) > at MetricsFromKafka$.main(MetricsFromKafka.scala:38) > at MetricsFromKafka.main(MetricsFromKafka.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at sbt.Run.invokeMain(Run.scala:67) > at sbt.Run.run0(Run.scala:61) > at sbt.Run.sbt$Run$$execute$1(Run.scala:51) > at sbt.Run$$anonfun$run$1.apply$mcV$sp(Run.scala:55) > at sbt.Run$$anonfun$run$1.apply(Run.scala:55) > at sbt.Run$$anonfun$run$1.apply(Run.scala:55) > at sbt.Logger$$anon$4.apply(Logger.scala:84) > at sbt.TrapExit$App.run(TrapExit.scala:248) > at java.lang.Thread.run(Thread.java:745) > {code} > In the above stackrace it is hard to figure out that the actual servers > provided as a config cannot be resolved to a valid ip address. Moreover the > flink kafka consumer will try all of those servers one by one and failing to > get partition information. > The suggested improvement is to fail fast and announce the user that the > servers provided in the 'boostrap.servers' config are invalid. If at least > one server is valid then the exception should not be thrown. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2397: [FLINK-4439] Validate 'bootstrap.servers' config in flink...
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2397 I've rebased to current master and triggered a build. https://travis-ci.org/rmetzger/flink/builds/162871029 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink
[ https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15523868#comment-15523868 ] ASF GitHub Bot commented on FLINK-2055: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2332 @nielsbasjes I want to avoid to exclude a version that is still in use by existing Flink users. I do not have in insight in which HBase versions currently in use. If (basically) everybody is on 1.1.x I am definitely in favor of bumping the version and API for Flink 1.2.0. Let's move the discussion to the dev and user mailing lists. > Implement Streaming HBaseSink > - > > Key: FLINK-2055 > URL: https://issues.apache.org/jira/browse/FLINK-2055 > Project: Flink > Issue Type: New Feature > Components: Streaming, Streaming Connectors >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Erli Ding > > As per : > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2332 @nielsbasjes I want to avoid to exclude a version that is still in use by existing Flink users. I do not have in insight in which HBase versions currently in use. If (basically) everybody is on 1.1.x I am definitely in favor of bumping the version and API for Flink 1.2.0. Let's move the discussion to the dev and user mailing lists. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink
[ https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15523849#comment-15523849 ] ASF GitHub Bot commented on FLINK-2055: --- Github user nielsbasjes commented on the issue: https://github.com/apache/flink/pull/2332 @fhueske: What is "older" ? I would like a clear statement about the (minimal) supported versions of HBase. I would see 1.1.x as old enough, or do you see 0.98 still required? > Implement Streaming HBaseSink > - > > Key: FLINK-2055 > URL: https://issues.apache.org/jira/browse/FLINK-2055 > Project: Flink > Issue Type: New Feature > Components: Streaming, Streaming Connectors >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Erli Ding > > As per : > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user nielsbasjes commented on the issue: https://github.com/apache/flink/pull/2332 @fhueske: What is "older" ? I would like a clear statement about the (minimal) supported versions of HBase. I would see 1.1.x as old enough, or do you see 0.98 still required? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-4688) Optimizer hangs for hours when optimizing complex plans
[ https://issues.apache.org/jira/browse/FLINK-4688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-4688: - Attachment: stacktrace_76minsAfterSubmission.java Stacktrace 76 mins after submission > Optimizer hangs for hours when optimizing complex plans > --- > > Key: FLINK-4688 > URL: https://issues.apache.org/jira/browse/FLINK-4688 > Project: Flink > Issue Type: Bug > Components: Optimizer >Affects Versions: 1.2.0, 1.1.3 >Reporter: Fabian Hueske > Attachments: stacktrace_32minsAfterSubmission.java, > stacktrace_76minsAfterSubmission.java, stacktrace_shortlyAfterSubmission.java > > > When optimizing a plan with many operators (more than 250), the optimizer > gets stuck for hours. > A user reported this problem on the user@f.a.o list [1] and provided > stacktraces taken at different points in time (shortly after submission, 32 > minutes and 76 minutes after submission) (see attachments). > The stacktraces show deeply recursive {{hasDamOnPathDownTo()}} calls. Maybe > it is possible to improve the performance by caching the results? > [1] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Complex-batch-workflow-needs-too-much-time-to-create-executionPlan-tp8596.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4688) Optimizer hangs for hours when optimizing complex plans
[ https://issues.apache.org/jira/browse/FLINK-4688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-4688: - Attachment: stacktrace_shortlyAfterSubmission.java Stacktrace shortly after submission > Optimizer hangs for hours when optimizing complex plans > --- > > Key: FLINK-4688 > URL: https://issues.apache.org/jira/browse/FLINK-4688 > Project: Flink > Issue Type: Bug > Components: Optimizer >Affects Versions: 1.2.0, 1.1.3 >Reporter: Fabian Hueske > Attachments: stacktrace_32minsAfterSubmission.java, > stacktrace_shortlyAfterSubmission.java > > > When optimizing a plan with many operators (more than 250), the optimizer > gets stuck for hours. > A user reported this problem on the user@f.a.o list [1] and provided > stacktraces taken at different points in time (shortly after submission, 32 > minutes and 76 minutes after submission) (see attachments). > The stacktraces show deeply recursive {{hasDamOnPathDownTo()}} calls. Maybe > it is possible to improve the performance by caching the results? > [1] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Complex-batch-workflow-needs-too-much-time-to-create-executionPlan-tp8596.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4688) Optimizer hangs for hours when optimizing complex plans
[ https://issues.apache.org/jira/browse/FLINK-4688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-4688: - Attachment: stacktrace_32minsAfterSubmission.java Stacktrace 32 mins after submission > Optimizer hangs for hours when optimizing complex plans > --- > > Key: FLINK-4688 > URL: https://issues.apache.org/jira/browse/FLINK-4688 > Project: Flink > Issue Type: Bug > Components: Optimizer >Affects Versions: 1.2.0, 1.1.3 >Reporter: Fabian Hueske > Attachments: stacktrace_32minsAfterSubmission.java, > stacktrace_shortlyAfterSubmission.java > > > When optimizing a plan with many operators (more than 250), the optimizer > gets stuck for hours. > A user reported this problem on the user@f.a.o list [1] and provided > stacktraces taken at different points in time (shortly after submission, 32 > minutes and 76 minutes after submission) (see attachments). > The stacktraces show deeply recursive {{hasDamOnPathDownTo()}} calls. Maybe > it is possible to improve the performance by caching the results? > [1] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Complex-batch-workflow-needs-too-much-time-to-create-executionPlan-tp8596.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4688) Optimizer hangs for hours when optimizing complex plans
Fabian Hueske created FLINK-4688: Summary: Optimizer hangs for hours when optimizing complex plans Key: FLINK-4688 URL: https://issues.apache.org/jira/browse/FLINK-4688 Project: Flink Issue Type: Bug Components: Optimizer Affects Versions: 1.2.0, 1.1.3 Reporter: Fabian Hueske When optimizing a plan with many operators (more than 250), the optimizer gets stuck for hours. A user reported this problem on the user@f.a.o list [1] and provided stacktraces taken at different points in time (shortly after submission, 32 minutes and 76 minutes after submission) (see attachments). The stacktraces show deeply recursive {{hasDamOnPathDownTo()}} calls. Maybe it is possible to improve the performance by caching the results? [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Complex-batch-workflow-needs-too-much-time-to-create-executionPlan-tp8596.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker
[ https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15523705#comment-15523705 ] ASF GitHub Bot commented on FLINK-4280: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2509 Thank you for working on this. I gave #2369 some love today to speed up things ;) > New Flink-specific option to set starting position of Kafka consumer without > respecting external offsets in ZK / Broker > --- > > Key: FLINK-4280 > URL: https://issues.apache.org/jira/browse/FLINK-4280 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0 > > > Currently, to start reading from the "earliest" and "latest" position in > topics for the Flink Kafka consumer, users set the Kafka config > {{auto.offset.reset}} in the provided properties configuration. > However, the way this config actually works might be a bit misleading if > users were trying to find a way to "read topics from a starting position". > The way the {{auto.offset.reset}} config works in the Flink Kafka consumer > resembles Kafka's original intent for the setting: first, existing external > offsets committed to the ZK / brokers will be checked; if none exists, then > will {{auto.offset.reset}} be respected. > I propose to add Flink-specific ways to define the starting position, without > taking into account the external offsets. The original behaviour (reference > external offsets first) can be changed to be a user option, so that the > behaviour can be retained for frequent Kafka users that may need some > collaboration with existing non-Flink Kafka consumer applications. > How users will interact with the Flink Kafka consumer after this is added, > with a newly introduced {{flink.starting-position}} config: > {code} > Properties props = new Properties(); > props.setProperty("flink.starting-position", "earliest/latest"); > props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a > warning) > props.setProperty("group.id", "...") // this won't have effect on the > starting position anymore (may still be used in external offset committing) > ... > {code} > Or, reference external offsets in ZK / broker: > {code} > Properties props = new Properties(); > props.setProperty("flink.starting-position", "external-offsets"); > props.setProperty("auto.offset.reset", "earliest/latest"); // default will be > latest > props.setProperty("group.id", "..."); // will be used to lookup external > offsets in ZK / broker on startup > ... > {code} > A thing we would need to decide on is what would the default value be for > {{flink.starting-position}}. > Two merits I see in adding this: > 1. This compensates the way users generally interpret "read from a starting > position". As the Flink Kafka connector is somewhat essentially a > "high-level" Kafka consumer for Flink users, I think it is reasonable to add > Flink-specific functionality that users will find useful, although it wasn't > supported in Kafka's original consumer designs. > 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is > used only to expose progress to the outside world, and not used to manipulate > how Kafka topics are read in Flink (unless users opt to do so)" is even more > definite and solid. There was some discussion in this PR > (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I > think adding this "decouples" more Flink's internal offset checkpointing from > the external Kafka's offset store. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2509: [FLINK-4280][kafka-connector] Explicit start position con...
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2509 Thank you for working on this. I gave #2369 some love today to speed up things ;) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4439) Error message KafkaConsumer08 when all 'bootstrap.servers' are invalid
[ https://issues.apache.org/jira/browse/FLINK-4439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15523703#comment-15523703 ] ASF GitHub Bot commented on FLINK-4439: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2397 Thank you. The pull request is now good to be merged! > Error message KafkaConsumer08 when all 'bootstrap.servers' are invalid > -- > > Key: FLINK-4439 > URL: https://issues.apache.org/jira/browse/FLINK-4439 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.0.3 >Reporter: Gheorghe Gheorghe >Priority: Minor > > The "flink-connector-kafka-0.8_2" is logging the following error when all > 'bootstrap.servers' are invalid when passed to the FlinkKafkaConsumer08. > See stacktrace: > {code:title=stacktrace|borderStyle=solid} > 2016-08-21 15:22:30 WARN FlinkKafkaConsumerBase:290 - Error communicating > with broker inexistentKafkHost:9092 to find partitions for [testTopic].class > java.nio.channels.ClosedChannelException. Message: null > 2016-08-21 15:22:30 DEBUG FlinkKafkaConsumerBase:292 - Detailed trace > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) > at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) > at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:91) > at kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:68) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:264) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:193) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:164) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:131) > at MetricsFromKafka$.main(MetricsFromKafka.scala:38) > at MetricsFromKafka.main(MetricsFromKafka.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at sbt.Run.invokeMain(Run.scala:67) > at sbt.Run.run0(Run.scala:61) > at sbt.Run.sbt$Run$$execute$1(Run.scala:51) > at sbt.Run$$anonfun$run$1.apply$mcV$sp(Run.scala:55) > at sbt.Run$$anonfun$run$1.apply(Run.scala:55) > at sbt.Run$$anonfun$run$1.apply(Run.scala:55) > at sbt.Logger$$anon$4.apply(Logger.scala:84) > at sbt.TrapExit$App.run(TrapExit.scala:248) > at java.lang.Thread.run(Thread.java:745) > {code} > In the above stackrace it is hard to figure out that the actual servers > provided as a config cannot be resolved to a valid ip address. Moreover the > flink kafka consumer will try all of those servers one by one and failing to > get partition information. > The suggested improvement is to fail fast and announce the user that the > servers provided in the 'boostrap.servers' config are invalid. If at least > one server is valid then the exception should not be thrown. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2397: [FLINK-4439] Validate 'bootstrap.servers' config in flink...
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2397 Thank you. The pull request is now good to be merged! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
[ https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15523695#comment-15523695 ] ASF GitHub Bot commented on FLINK-4035: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2369 @tzulitai I addressed all your comments except the one relating `FlinkKafkaProducer010Configuration`: I had a quick offline discussion with @StephanEwen about the issue and he suggested to add the timestamp to the regular sink interface. But I would like to make that change separate from this one, and merge the Kafka 0.10. support as-is. This will make it easier for people to try it out now and provide us with feedback. Also, I think some other Kafka related pull requests are blocked on this one. @tzulitai could you do a final pass over the changes. If you agree, I'd like to merge it afterwards. > Bump Kafka producer in Kafka sink to Kafka 0.10.0.0 > --- > > Key: FLINK-4035 > URL: https://issues.apache.org/jira/browse/FLINK-4035 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Elias Levy >Assignee: Robert Metzger >Priority: Minor > > Kafka 0.10.0.0 introduced protocol changes related to the producer. > Published messages now include timestamps and compressed messages now include > relative offsets. As it is now, brokers must decompress publisher compressed > messages, assign offset to them, and recompress them, which is wasteful and > makes it less likely that compression will be used at all. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2369: [FLINK-4035] Add a streaming connector for Apache Kafka 0...
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2369 @tzulitai I addressed all your comments except the one relating `FlinkKafkaProducer010Configuration`: I had a quick offline discussion with @StephanEwen about the issue and he suggested to add the timestamp to the regular sink interface. But I would like to make that change separate from this one, and merge the Kafka 0.10. support as-is. This will make it easier for people to try it out now and provide us with feedback. Also, I think some other Kafka related pull requests are blocked on this one. @tzulitai could you do a final pass over the changes. If you agree, I'd like to merge it afterwards. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2529: [FLINK-4241] [table] Cryptic expression parser exc...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2529 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4241) Cryptic expression parser exceptions
[ https://issues.apache.org/jira/browse/FLINK-4241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15523636#comment-15523636 ] ASF GitHub Bot commented on FLINK-4241: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2529 > Cryptic expression parser exceptions > > > Key: FLINK-4241 > URL: https://issues.apache.org/jira/browse/FLINK-4241 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Timo Walther > Fix For: 1.2.0 > > > The exceptions thrown when giving wrong SQL syntax to Flink's SQL parser is > very cryptic and should be improved. For example, the following code snippet: > {code} > inputTable.filter("a == 0"); > {code} > gives the following exception: > {code} > Exception in thread "main" > org.apache.flink.api.table.ExpressionParserException: Could not parse > expression: [1.4] failure: `-' expected but `=' found > a == 0 >^ > at > org.apache.flink.api.table.expressions.ExpressionParser$.parseExpression(ExpressionParser.scala:355) > at org.apache.flink.api.table.Table.filter(table.scala:161) > at > com.dataartisans.streaming.SimpleTableAPIJob.main(SimpleTableAPIJob.java:32) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) > {code} > From this description it is very hard to understand that {{==}} is not a > valid operator. > Another example is: > {code} > inputTable.select("*"); > {code} > which gives > {code} > Exception in thread "main" > org.apache.flink.api.table.ExpressionParserException: Could not parse > expression: Base Failure > at > org.apache.flink.api.table.expressions.ExpressionParser$.parseExpressionList(ExpressionParser.scala:342) > at org.apache.flink.api.table.Table.select(table.scala:103) > at > com.dataartisans.streaming.SimpleTableAPIJob.main(SimpleTableAPIJob.java:33) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) > {code} > I think it would considerably improve user experience if we print more > helpful parsing exceptions. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4241) Cryptic expression parser exceptions
[ https://issues.apache.org/jira/browse/FLINK-4241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-4241. - Resolution: Fixed Fix Version/s: 1.2.0 Fixed in ef15984988e883ced5311b332e5e5d8521c9573f. > Cryptic expression parser exceptions > > > Key: FLINK-4241 > URL: https://issues.apache.org/jira/browse/FLINK-4241 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Timo Walther > Fix For: 1.2.0 > > > The exceptions thrown when giving wrong SQL syntax to Flink's SQL parser is > very cryptic and should be improved. For example, the following code snippet: > {code} > inputTable.filter("a == 0"); > {code} > gives the following exception: > {code} > Exception in thread "main" > org.apache.flink.api.table.ExpressionParserException: Could not parse > expression: [1.4] failure: `-' expected but `=' found > a == 0 >^ > at > org.apache.flink.api.table.expressions.ExpressionParser$.parseExpression(ExpressionParser.scala:355) > at org.apache.flink.api.table.Table.filter(table.scala:161) > at > com.dataartisans.streaming.SimpleTableAPIJob.main(SimpleTableAPIJob.java:32) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) > {code} > From this description it is very hard to understand that {{==}} is not a > valid operator. > Another example is: > {code} > inputTable.select("*"); > {code} > which gives > {code} > Exception in thread "main" > org.apache.flink.api.table.ExpressionParserException: Could not parse > expression: Base Failure > at > org.apache.flink.api.table.expressions.ExpressionParser$.parseExpressionList(ExpressionParser.scala:342) > at org.apache.flink.api.table.Table.select(table.scala:103) > at > com.dataartisans.streaming.SimpleTableAPIJob.main(SimpleTableAPIJob.java:33) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) > {code} > I think it would considerably improve user experience if we print more > helpful parsing exceptions. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4687) Add getAddress method to RpcService
[ https://issues.apache.org/jira/browse/FLINK-4687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15523629#comment-15523629 ] ASF GitHub Bot commented on FLINK-4687: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/2551 [FLINK-4687] [rpc] Add getAddress to RpcService Adds the `getAddress` method to the `RpcService` which allows to retrieve the address under which the `RpcService` is reachable. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink extendRpcGetAddress Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2551.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2551 commit caebd864c9c2b6e2fb1c777e4b84cbc1c02a87d7 Author: Till Rohrmann Date: 2016-09-26T16:01:47Z [FLINK-4687] [rpc] Add getAddress to RpcService > Add getAddress method to RpcService > --- > > Key: FLINK-4687 > URL: https://issues.apache.org/jira/browse/FLINK-4687 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > It would be useful to expose the hostname to which the {{RpcService}} has > been bound. This can then be used to retrieve the network interface which is > reachable from the outside. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2551: [FLINK-4687] [rpc] Add getAddress to RpcService
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/2551 [FLINK-4687] [rpc] Add getAddress to RpcService Adds the `getAddress` method to the `RpcService` which allows to retrieve the address under which the `RpcService` is reachable. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink extendRpcGetAddress Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2551.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2551 commit caebd864c9c2b6e2fb1c777e4b84cbc1c02a87d7 Author: Till Rohrmann Date: 2016-09-26T16:01:47Z [FLINK-4687] [rpc] Add getAddress to RpcService --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4687) Add getAddress method to RpcService
Till Rohrmann created FLINK-4687: Summary: Add getAddress method to RpcService Key: FLINK-4687 URL: https://issues.apache.org/jira/browse/FLINK-4687 Project: Flink Issue Type: Sub-task Components: Distributed Coordination Affects Versions: 1.2.0 Reporter: Till Rohrmann Assignee: Till Rohrmann It would be useful to expose the hostname to which the {{RpcService}} has been bound. This can then be used to retrieve the network interface which is reachable from the outside. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4252) Table program cannot be compiled
[ https://issues.apache.org/jira/browse/FLINK-4252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-4252. - Resolution: Fixed Fix Version/s: 1.2.0 Fixed in f150f987772c8d96f41a5acd1d20cba6622cb5c9. > Table program cannot be compiled > > > Key: FLINK-4252 > URL: https://issues.apache.org/jira/browse/FLINK-4252 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.1.0 > Environment: OS X EI Captain > scala 2.11.7 > jdk 8 >Reporter: Renkai Ge >Assignee: Timo Walther > Fix For: 1.2.0 > > Attachments: TestMain.scala > > > I'm trying the table apis. > I got some errors like this > My code is in the attachments > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413) > at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:92) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:376) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896) > at org.apache.flink.api.java.DataSet.collect(DataSet.java:410) > at org.apache.flink.api.java.DataSet.print(DataSet.java:1605) > at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1672) > at TestMain$.main(TestMain.scala:31) > at TestMain.main(TestMain.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:853) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:799) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:799) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.Exception: The user defined 'open(Configuration)' method > in class org.apache.flink.api.table.runtime.FlatMapRunner caused an > exception: Table program cannot be compiled. This is a bug. Please file an > issue. > at > org.apache.flink.runtime.operators.BatchTask.openUserCode(BatchTask.java:1337) > at > org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.openTask(ChainedFlatMapDriver.java:47) > at > org.apache.flink.runtime.operators.BatchTask.openChainedTasks(BatchTask.java:1377) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:471) > at > org.apache.flink.runtime.operators.BatchTask.inv
[GitHub] flink pull request #2507: [FLINK-4252] [table] Validate input and output cla...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2507 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4252) Table program cannot be compiled
[ https://issues.apache.org/jira/browse/FLINK-4252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15523617#comment-15523617 ] ASF GitHub Bot commented on FLINK-4252: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2507 > Table program cannot be compiled > > > Key: FLINK-4252 > URL: https://issues.apache.org/jira/browse/FLINK-4252 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.1.0 > Environment: OS X EI Captain > scala 2.11.7 > jdk 8 >Reporter: Renkai Ge >Assignee: Timo Walther > Attachments: TestMain.scala > > > I'm trying the table apis. > I got some errors like this > My code is in the attachments > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413) > at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:92) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:376) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896) > at org.apache.flink.api.java.DataSet.collect(DataSet.java:410) > at org.apache.flink.api.java.DataSet.print(DataSet.java:1605) > at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1672) > at TestMain$.main(TestMain.scala:31) > at TestMain.main(TestMain.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:853) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:799) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:799) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.Exception: The user defined 'open(Configuration)' method > in class org.apache.flink.api.table.runtime.FlatMapRunner caused an > exception: Table program cannot be compiled. This is a bug. Please file an > issue. > at > org.apache.flink.runtime.operators.BatchTask.openUserCode(BatchTask.java:1337) > at > org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.openTask(ChainedFlatMapDriver.java:47) > at > org.apache.flink.runtime.operators.BatchTask.openChainedTasks(BatchTask.java:1377) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:471) > at > org.apache.flink.runtime.o
[jira] [Resolved] (FLINK-4590) Some Table API tests are failing when debug lvl is set to DEBUG
[ https://issues.apache.org/jira/browse/FLINK-4590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-4590. - Resolution: Fixed Fix Version/s: 1.2.0 Fixed in 7eb45c133c49933b14719f06bf68ccf162a3e0b2. > Some Table API tests are failing when debug lvl is set to DEBUG > --- > > Key: FLINK-4590 > URL: https://issues.apache.org/jira/browse/FLINK-4590 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Timo Walther > Fix For: 1.2.0 > > > For debugging another issue, I've set the log level on travis to DEBUG. > After that, the Table API tests started failing > {code} > Failed tests: > SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while > applying rule DataSetScanRule > SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while > applying rule DataSetScanRule > SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while > applying rule DataSetScanRule > SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while > applying rule DataSetScanRule > SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error > occurred while applying rule DataSetScanRule > SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error > occurred while applying rule DataSetScanRule > SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error > occurred while applying rule DataSetScanRule > SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error > occurred while applying rule DataSetScanRule > SetOperatorsITCase.testMinus:175 Internal error: Error occurred while > applying rule DataSetScanRule > SetOperatorsITCase.testMinus:175 Internal error: Error occurred while > applying rule DataSetScanRule > SetOperatorsITCase.testMinus:175 Internal error: Error occurred while > applying rule DataSetScanRule > SetOperatorsITCase.testMinus:175 Internal error: Error occurred while > applying rule DataSetScanRule > {code} > Probably Calcite is executing additional assertions depending on the debug > level. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4590) Some Table API tests are failing when debug lvl is set to DEBUG
[ https://issues.apache.org/jira/browse/FLINK-4590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15523591#comment-15523591 ] ASF GitHub Bot commented on FLINK-4590: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2504 > Some Table API tests are failing when debug lvl is set to DEBUG > --- > > Key: FLINK-4590 > URL: https://issues.apache.org/jira/browse/FLINK-4590 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Timo Walther > > For debugging another issue, I've set the log level on travis to DEBUG. > After that, the Table API tests started failing > {code} > Failed tests: > SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while > applying rule DataSetScanRule > SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while > applying rule DataSetScanRule > SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while > applying rule DataSetScanRule > SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while > applying rule DataSetScanRule > SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error > occurred while applying rule DataSetScanRule > SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error > occurred while applying rule DataSetScanRule > SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error > occurred while applying rule DataSetScanRule > SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error > occurred while applying rule DataSetScanRule > SetOperatorsITCase.testMinus:175 Internal error: Error occurred while > applying rule DataSetScanRule > SetOperatorsITCase.testMinus:175 Internal error: Error occurred while > applying rule DataSetScanRule > SetOperatorsITCase.testMinus:175 Internal error: Error occurred while > applying rule DataSetScanRule > SetOperatorsITCase.testMinus:175 Internal error: Error occurred while > applying rule DataSetScanRule > {code} > Probably Calcite is executing additional assertions depending on the debug > level. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2504: [FLINK-4590] [table] Some Table API tests are fail...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2504 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-4686) Add possibility to get column names
[ https://issues.apache.org/jira/browse/FLINK-4686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-4686: Labels: starter (was: ) > Add possibility to get column names > --- > > Key: FLINK-4686 > URL: https://issues.apache.org/jira/browse/FLINK-4686 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther > Labels: starter > > For debugging and maybe for visualization in future (e.g. in a shell) it > would be good to have the possibilty to get the names of {{Table}} columns. > At the moment the user has no idea how the table columns are named; if they > need to be matched with POJO fields for example. > My suggestion: > {code} > Schema s = table.schema(); > TypeInformation type = s.getType(1); > TypeInformation type = s.getType("col"); > String s = s.getColumnName(1); > String[] s = s.getColumnNames(); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4686) Add possibility to get column names
Timo Walther created FLINK-4686: --- Summary: Add possibility to get column names Key: FLINK-4686 URL: https://issues.apache.org/jira/browse/FLINK-4686 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Timo Walther For debugging and maybe for visualization in future (e.g. in a shell) it would be good to have the possibilty to get the names of {{Table}} columns. At the moment the user has no idea how the table columns are named; if they need to be matched with POJO fields for example. My suggestion: {code} Schema s = table.schema(); TypeInformation type = s.getType(1); TypeInformation type = s.getType("col"); String s = s.getColumnName(1); String[] s = s.getColumnNames(); {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2535: [FLINK-4662] Bump Calcite version up to 1.9
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2535 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-2765) Upgrade hbase version for hadoop-2 to 1.2 release
[ https://issues.apache.org/jira/browse/FLINK-2765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-2765: -- Description: Currently 0.98.11 is used: {code} 0.98.11-hadoop2 {code} Stable release for hadoop-2 is 1.2.x line We should upgrade to 1.2.1 was: Currently 0.98.11 is used: {code} 0.98.11-hadoop2 {code} Stable release for hadoop-2 is 1.1.x line We should upgrade to 1.2.1 > Upgrade hbase version for hadoop-2 to 1.2 release > - > > Key: FLINK-2765 > URL: https://issues.apache.org/jira/browse/FLINK-2765 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Priority: Minor > > Currently 0.98.11 is used: > {code} > 0.98.11-hadoop2 > {code} > Stable release for hadoop-2 is 1.2.x line > We should upgrade to 1.2.1 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3734) Unclosed DataInputView in AbstractAlignedProcessingTimeWindowOperator#restoreState()
[ https://issues.apache.org/jira/browse/FLINK-3734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-3734: -- Description: {code} DataInputView in = inputState.getState(getUserCodeClassloader()); final long nextEvaluationTime = in.readLong(); final long nextSlideTime = in.readLong(); AbstractKeyedTimePanes panes = createPanes(keySelector, function); panes.readFromInput(in, keySerializer, stateTypeSerializer); restoredState = new RestoredState<>(panes, nextEvaluationTime, nextSlideTime); } {code} DataInputView in is not closed upon return. was: {code} DataInputView in = inputState.getState(getUserCodeClassloader()); final long nextEvaluationTime = in.readLong(); final long nextSlideTime = in.readLong(); AbstractKeyedTimePanes panes = createPanes(keySelector, function); panes.readFromInput(in, keySerializer, stateTypeSerializer); restoredState = new RestoredState<>(panes, nextEvaluationTime, nextSlideTime); } {code} DataInputView in is not closed upon return. > Unclosed DataInputView in > AbstractAlignedProcessingTimeWindowOperator#restoreState() > > > Key: FLINK-3734 > URL: https://issues.apache.org/jira/browse/FLINK-3734 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > DataInputView in = inputState.getState(getUserCodeClassloader()); > final long nextEvaluationTime = in.readLong(); > final long nextSlideTime = in.readLong(); > AbstractKeyedTimePanes panes = > createPanes(keySelector, function); > panes.readFromInput(in, keySerializer, stateTypeSerializer); > restoredState = new RestoredState<>(panes, nextEvaluationTime, > nextSlideTime); > } > {code} > DataInputView in is not closed upon return. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3801) Upgrade Joda-Time library to 2.9.3
[ https://issues.apache.org/jira/browse/FLINK-3801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-3801: -- Description: Currently yoda-time 2.5 is used which was very old. We should upgrade to 2.9.3 was: Currently yoda-time 2.5 is used which was very old. We should upgrade to 2.9.3 > Upgrade Joda-Time library to 2.9.3 > -- > > Key: FLINK-3801 > URL: https://issues.apache.org/jira/browse/FLINK-3801 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Priority: Minor > > Currently yoda-time 2.5 is used which was very old. > We should upgrade to 2.9.3 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3222) Incorrect shift amount in OperatorCheckpointStats#hashCode()
[ https://issues.apache.org/jira/browse/FLINK-3222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-3222: -- Description: Here is related code: {code} result = 31 * result + (int) (subTaskStats.length ^ (subTaskStats.length >>> 32)); {code} subTaskStats.length is an int. The shift amount is greater than 31 bits. was: Here is related code: {code} result = 31 * result + (int) (subTaskStats.length ^ (subTaskStats.length >>> 32)); {code} subTaskStats.length is an int. The shift amount is greater than 31 bits. > Incorrect shift amount in OperatorCheckpointStats#hashCode() > > > Key: FLINK-3222 > URL: https://issues.apache.org/jira/browse/FLINK-3222 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > Here is related code: > {code} > result = 31 * result + (int) (subTaskStats.length ^ (subTaskStats.length > >>> 32)); > {code} > subTaskStats.length is an int. > The shift amount is greater than 31 bits. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4573) Potential resource leak due to unclosed RandomAccessFile in TaskManagerLogHandler
[ https://issues.apache.org/jira/browse/FLINK-4573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-4573: -- Description: {code} try { raf = new RandomAccessFile(file, "r"); } catch (FileNotFoundException e) { display(ctx, request, "Displaying TaskManager log failed."); LOG.error("Displaying TaskManager log failed.", e); return; } long fileLength = raf.length(); final FileChannel fc = raf.getChannel(); {code} If length() throws IOException, raf would be left unclosed. was: {code} try { raf = new RandomAccessFile(file, "r"); } catch (FileNotFoundException e) { display(ctx, request, "Displaying TaskManager log failed."); LOG.error("Displaying TaskManager log failed.", e); return; } long fileLength = raf.length(); final FileChannel fc = raf.getChannel(); {code} If length() throws IOException, raf would be left unclosed. > Potential resource leak due to unclosed RandomAccessFile in > TaskManagerLogHandler > - > > Key: FLINK-4573 > URL: https://issues.apache.org/jira/browse/FLINK-4573 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > try { > raf = new > RandomAccessFile(file, "r"); > } catch > (FileNotFoundException e) { > display(ctx, request, > "Displaying TaskManager log failed."); > LOG.error("Displaying > TaskManager log failed.", e); > return; > } > long fileLength = > raf.length(); > final FileChannel fc = > raf.getChannel(); > {code} > If length() throws IOException, raf would be left unclosed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4534) Lack of synchronization in BucketingSink#restoreState()
[ https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-4534: -- Description: Iteration over state.bucketStates is protected by synchronization in other methods, except for the following in restoreState(): {code} for (BucketState bucketState : state.bucketStates.values()) { {code} and following in close(): {code} for (Map.Entry> entry : state.bucketStates.entrySet()) { closeCurrentPartFile(entry.getValue()); {code} w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue starting line 752: {code} Set pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet(); LOG.debug("Moving pending files to final location on restore."); for (Long pastCheckpointId : pastCheckpointIds) { {code} was: Iteration over state.bucketStates is protected by synchronization in other methods, except for the following in restoreState(): {code} for (BucketState bucketState : state.bucketStates.values()) { {code} and following in close(): {code} for (Map.Entry> entry : state.bucketStates.entrySet()) { closeCurrentPartFile(entry.getValue()); {code} w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue starting line 752: {code} Set pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet(); LOG.debug("Moving pending files to final location on restore."); for (Long pastCheckpointId : pastCheckpointIds) { {code} > Lack of synchronization in BucketingSink#restoreState() > --- > > Key: FLINK-4534 > URL: https://issues.apache.org/jira/browse/FLINK-4534 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu > > Iteration over state.bucketStates is protected by synchronization in other > methods, except for the following in restoreState(): > {code} > for (BucketState bucketState : state.bucketStates.values()) { > {code} > and following in close(): > {code} > for (Map.Entry> entry : > state.bucketStates.entrySet()) { > closeCurrentPartFile(entry.getValue()); > {code} > w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue > starting line 752: > {code} > Set pastCheckpointIds = > bucketState.pendingFilesPerCheckpoint.keySet(); > LOG.debug("Moving pending files to final location on restore."); > for (Long pastCheckpointId : pastCheckpointIds) { > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4662) Bump Calcite version up to 1.9
[ https://issues.apache.org/jira/browse/FLINK-4662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15523472#comment-15523472 ] ASF GitHub Bot commented on FLINK-4662: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2535 > Bump Calcite version up to 1.9 > -- > > Key: FLINK-4662 > URL: https://issues.apache.org/jira/browse/FLINK-4662 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Jark Wu > Fix For: 1.2.0 > > > Calcite just released the 1.9 version. We should adopt it also in the Table > API especially for FLINK-4294. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4685) Gather operator checkpoint durations data sizes from the runtime
Stephan Ewen created FLINK-4685: --- Summary: Gather operator checkpoint durations data sizes from the runtime Key: FLINK-4685 URL: https://issues.apache.org/jira/browse/FLINK-4685 Project: Flink Issue Type: Sub-task Components: State Backends, Checkpointing Affects Versions: 1.1.2 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.2.0 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4410) Report more information about operator checkpoints
[ https://issues.apache.org/jira/browse/FLINK-4410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen updated FLINK-4410: Description: Checkpoint statistics contain the duration of a checkpoint, measured as from the CheckpointCoordinator's start to the point when the acknowledge message came. We should additionally expose - duration of the synchronous part of a checkpoint - duration of the asynchronous part of a checkpoint - number of bytes buffered during the stream alignment phase - duration of the stream alignment phase Note: In the case of using *at-least once* semantics, the latter two will always be zero. was:Checkpoint statistics contain the duration of a checkpoint. We should split this time into the synchronous and asynchronous part. This will give more insight into the inner workings of the checkpointing mechanism and help users better understand what's going on. > Report more information about operator checkpoints > -- > > Key: FLINK-4410 > URL: https://issues.apache.org/jira/browse/FLINK-4410 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing, Webfrontend >Affects Versions: 1.1.2 >Reporter: Ufuk Celebi >Assignee: Stephan Ewen >Priority: Minor > Fix For: 1.2.0 > > > Checkpoint statistics contain the duration of a checkpoint, measured as from > the CheckpointCoordinator's start to the point when the acknowledge message > came. > We should additionally expose > - duration of the synchronous part of a checkpoint > - duration of the asynchronous part of a checkpoint > - number of bytes buffered during the stream alignment phase > - duration of the stream alignment phase > Note: In the case of using *at-least once* semantics, the latter two will > always be zero. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4410) Report more information about operator checkpoints
[ https://issues.apache.org/jira/browse/FLINK-4410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen updated FLINK-4410: Fix Version/s: 1.2.0 > Report more information about operator checkpoints > -- > > Key: FLINK-4410 > URL: https://issues.apache.org/jira/browse/FLINK-4410 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing, Webfrontend >Affects Versions: 1.1.2 >Reporter: Ufuk Celebi >Assignee: Stephan Ewen > Fix For: 1.2.0 > > > Checkpoint statistics contain the duration of a checkpoint, measured as from > the CheckpointCoordinator's start to the point when the acknowledge message > came. > We should additionally expose > - duration of the synchronous part of a checkpoint > - duration of the asynchronous part of a checkpoint > - number of bytes buffered during the stream alignment phase > - duration of the stream alignment phase > Note: In the case of using *at-least once* semantics, the latter two will > always be zero. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4410) Report more information about operator checkpoints
[ https://issues.apache.org/jira/browse/FLINK-4410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen updated FLINK-4410: Affects Version/s: 1.1.2 > Report more information about operator checkpoints > -- > > Key: FLINK-4410 > URL: https://issues.apache.org/jira/browse/FLINK-4410 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing, Webfrontend >Affects Versions: 1.1.2 >Reporter: Ufuk Celebi >Assignee: Stephan Ewen > Fix For: 1.2.0 > > > Checkpoint statistics contain the duration of a checkpoint, measured as from > the CheckpointCoordinator's start to the point when the acknowledge message > came. > We should additionally expose > - duration of the synchronous part of a checkpoint > - duration of the asynchronous part of a checkpoint > - number of bytes buffered during the stream alignment phase > - duration of the stream alignment phase > Note: In the case of using *at-least once* semantics, the latter two will > always be zero. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4410) Report more information about operator checkpoints
[ https://issues.apache.org/jira/browse/FLINK-4410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen updated FLINK-4410: Priority: Major (was: Minor) > Report more information about operator checkpoints > -- > > Key: FLINK-4410 > URL: https://issues.apache.org/jira/browse/FLINK-4410 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing, Webfrontend >Affects Versions: 1.1.2 >Reporter: Ufuk Celebi >Assignee: Stephan Ewen > Fix For: 1.2.0 > > > Checkpoint statistics contain the duration of a checkpoint, measured as from > the CheckpointCoordinator's start to the point when the acknowledge message > came. > We should additionally expose > - duration of the synchronous part of a checkpoint > - duration of the asynchronous part of a checkpoint > - number of bytes buffered during the stream alignment phase > - duration of the stream alignment phase > Note: In the case of using *at-least once* semantics, the latter two will > always be zero. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4410) Report more information about operator checkpoints
[ https://issues.apache.org/jira/browse/FLINK-4410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen updated FLINK-4410: Component/s: State Backends, Checkpointing > Report more information about operator checkpoints > -- > > Key: FLINK-4410 > URL: https://issues.apache.org/jira/browse/FLINK-4410 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing, Webfrontend >Affects Versions: 1.1.2 >Reporter: Ufuk Celebi >Assignee: Stephan Ewen > Fix For: 1.2.0 > > > Checkpoint statistics contain the duration of a checkpoint, measured as from > the CheckpointCoordinator's start to the point when the acknowledge message > came. > We should additionally expose > - duration of the synchronous part of a checkpoint > - duration of the asynchronous part of a checkpoint > - number of bytes buffered during the stream alignment phase > - duration of the stream alignment phase > Note: In the case of using *at-least once* semantics, the latter two will > always be zero. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4410) Report more information about operator checkpoints
[ https://issues.apache.org/jira/browse/FLINK-4410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen updated FLINK-4410: Summary: Report more information about operator checkpoints (was: Split checkpoint times into synchronous and asynchronous part) > Report more information about operator checkpoints > -- > > Key: FLINK-4410 > URL: https://issues.apache.org/jira/browse/FLINK-4410 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: Ufuk Celebi >Assignee: Stephan Ewen >Priority: Minor > > Checkpoint statistics contain the duration of a checkpoint. We should split > this time into the synchronous and asynchronous part. This will give more > insight into the inner workings of the checkpointing mechanism and help users > better understand what's going on. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2550: [Flink-4657] Implement HighAvailabilityServices ba...
GitHub user KurtYoung opened a pull request: https://github.com/apache/flink/pull/2550 [Flink-4657] Implement HighAvailabilityServices based on zookeeper This actually contains 3 commits. More details can be found here: https://github.com/StephanEwen/incubator-flink/pull/15 You can merge this pull request into a Git repository by running: $ git pull https://github.com/KurtYoung/flink flink-4657 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2550.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2550 commit b1e9db28d55ef53c58d17bd0fa73da05a1b6570a Author: Kurt Young Date: 2016-09-26T02:59:16Z [FLINK-4657] Add contains() to submitted job graph store, to indicate whether a job needs to be run. commit b0c88a04b0461a84a05aaba1d649b59a7e13fe7b Author: Kurt Young Date: 2016-09-22T01:07:13Z [FLINK-4657] Implement HighAvailabilityServices based on zookeeper commit b8801720c044cba10562ae8cc5b2770cb21cf07a Author: Kurt Young Date: 2016-09-26T15:36:04Z [FLINK-4657] Implement a few rpc calls for JobMaster --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15523361#comment-15523361 ] ASF GitHub Bot commented on FLINK-4294: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2319#discussion_r80500304 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala --- @@ -223,10 +223,28 @@ trait ImplicitExpressionOperations { */ def toTime = Cast(expr, SqlTimeTypeInfo.TIME) -/** + /** * Parses a timestamp String in the form "yy-mm-dd hh:mm:ss.fff" to a SQL Timestamp. */ def toTimestamp = Cast(expr, SqlTimeTypeInfo.TIMESTAMP) + + /** +* Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by name and +* returns it's value. +* +* @param name name of the field (similar to Flink's field expressions) +* @return value of the field +*/ + def getField(name: String) = GetCompositeField(expr, name) --- End diff -- I think we don't need a wildcard. I will change this PR to support `field$subfield`. > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2319: [FLINK-4294] [table] Allow access of composite typ...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2319#discussion_r80500304 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala --- @@ -223,10 +223,28 @@ trait ImplicitExpressionOperations { */ def toTime = Cast(expr, SqlTimeTypeInfo.TIME) -/** + /** * Parses a timestamp String in the form "yy-mm-dd hh:mm:ss.fff" to a SQL Timestamp. */ def toTimestamp = Cast(expr, SqlTimeTypeInfo.TIMESTAMP) + + /** +* Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by name and +* returns it's value. +* +* @param name name of the field (similar to Flink's field expressions) +* @return value of the field +*/ + def getField(name: String) = GetCompositeField(expr, name) --- End diff -- I think we don't need a wildcard. I will change this PR to support `field$subfield`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4564) [metrics] Delimiter should be configured per reporter
[ https://issues.apache.org/jira/browse/FLINK-4564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15523352#comment-15523352 ] ASF GitHub Bot commented on FLINK-4564: --- Github user ex00 commented on a diff in the pull request: https://github.com/apache/flink/pull/2517#discussion_r80499420 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java --- @@ -219,9 +249,16 @@ public ScopeFormats getScopeFormats() { public void register(Metric metric, String metricName, MetricGroup group) { --- End diff -- In this case I get error in the compile for extending class of MetricRegistry. For example org.apache.flink.runtime.metrics.groups.TaskMetricGroupTest.CountingMetricRegistry#register(Metric , String , MetricGroup) and org.apache.flink.runtime.metrics.groups.MetricGroupTest.ExceptionOnRegisterRegistry#register(Metric , String , MetricGroup) Cause by: method does not override or implement a method from a supertype It is simple tests classes, I can change methods signature in their. But will not be any problems in future because of this change in MetricRegistry? > [metrics] Delimiter should be configured per reporter > - > > Key: FLINK-4564 > URL: https://issues.apache.org/jira/browse/FLINK-4564 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Anton Mushin > > Currently, the delimiter used or the scope string is based on a configuration > setting shared by all reporters. However, different reporters may have > different requirements in regards to the delimiter, as such we should allow > reporters to use a different delimiter. > We can keep the current setting as a global setting that is used if no > specific setting was set. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2517: [FLINK-4564] [metrics] Delimiter should be configu...
Github user ex00 commented on a diff in the pull request: https://github.com/apache/flink/pull/2517#discussion_r80499420 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java --- @@ -219,9 +249,16 @@ public ScopeFormats getScopeFormats() { public void register(Metric metric, String metricName, MetricGroup group) { --- End diff -- In this case I get error in the compile for extending class of MetricRegistry. For example org.apache.flink.runtime.metrics.groups.TaskMetricGroupTest.CountingMetricRegistry#register(Metric , String , MetricGroup) and org.apache.flink.runtime.metrics.groups.MetricGroupTest.ExceptionOnRegisterRegistry#register(Metric , String , MetricGroup) Cause by: method does not override or implement a method from a supertype It is simple tests classes, I can change methods signature in their. But will not be any problems in future because of this change in MetricRegistry? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2369: [FLINK-4035] Add a streaming connector for Apache ...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2369#discussion_r80496915 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java --- @@ -60,12 +60,17 @@ protected void assignPartitionsToConsumer(KafkaConsumer consumer consumer.assign(topicPartitions); } + @Override + protected void emitRecord(T record, KafkaTopicPartitionState partition, long offset, ConsumerRecord consumerRecord) throws Exception { + // pass timestamp + super.emitRecord(record, partition, offset, consumerRecord.timestamp()); + } + /** * Emit record Kafka-timestamp aware. */ @Override - protected void emitRecord(T record, KafkaTopicPartitionState partitionState, long offset, R kafkaRecord) throws Exception { - long timestamp = ((ConsumerRecord) kafkaRecord).timestamp(); + protected void emitRecord(T record, KafkaTopicPartitionState partitionState, long offset, long timestamp) throws Exception { if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) { --- End diff -- Yes, I'll do that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2509: [FLINK-4280][kafka-connector] Explicit start position con...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2509 I'll rebase this PR soon, probably will also wait for Kafka 0.10 connector to be merged. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker
[ https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15523340#comment-15523340 ] ASF GitHub Bot commented on FLINK-4280: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2509 I'll rebase this PR soon, probably will also wait for Kafka 0.10 connector to be merged. > New Flink-specific option to set starting position of Kafka consumer without > respecting external offsets in ZK / Broker > --- > > Key: FLINK-4280 > URL: https://issues.apache.org/jira/browse/FLINK-4280 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0 > > > Currently, to start reading from the "earliest" and "latest" position in > topics for the Flink Kafka consumer, users set the Kafka config > {{auto.offset.reset}} in the provided properties configuration. > However, the way this config actually works might be a bit misleading if > users were trying to find a way to "read topics from a starting position". > The way the {{auto.offset.reset}} config works in the Flink Kafka consumer > resembles Kafka's original intent for the setting: first, existing external > offsets committed to the ZK / brokers will be checked; if none exists, then > will {{auto.offset.reset}} be respected. > I propose to add Flink-specific ways to define the starting position, without > taking into account the external offsets. The original behaviour (reference > external offsets first) can be changed to be a user option, so that the > behaviour can be retained for frequent Kafka users that may need some > collaboration with existing non-Flink Kafka consumer applications. > How users will interact with the Flink Kafka consumer after this is added, > with a newly introduced {{flink.starting-position}} config: > {code} > Properties props = new Properties(); > props.setProperty("flink.starting-position", "earliest/latest"); > props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a > warning) > props.setProperty("group.id", "...") // this won't have effect on the > starting position anymore (may still be used in external offset committing) > ... > {code} > Or, reference external offsets in ZK / broker: > {code} > Properties props = new Properties(); > props.setProperty("flink.starting-position", "external-offsets"); > props.setProperty("auto.offset.reset", "earliest/latest"); // default will be > latest > props.setProperty("group.id", "..."); // will be used to lookup external > offsets in ZK / broker on startup > ... > {code} > A thing we would need to decide on is what would the default value be for > {{flink.starting-position}}. > Two merits I see in adding this: > 1. This compensates the way users generally interpret "read from a starting > position". As the Flink Kafka connector is somewhat essentially a > "high-level" Kafka consumer for Flink users, I think it is reasonable to add > Flink-specific functionality that users will find useful, although it wasn't > supported in Kafka's original consumer designs. > 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is > used only to expose progress to the outside world, and not used to manipulate > how Kafka topics are read in Flink (unless users opt to do so)" is even more > definite and solid. There was some discussion in this PR > (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I > think adding this "decouples" more Flink's internal offset checkpointing from > the external Kafka's offset store. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2369: [FLINK-4035] Add a streaming connector for Apache Kafka 0...
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2369 I'm currently working on rebasing the PR and addressing the comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
[ https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15523334#comment-15523334 ] ASF GitHub Bot commented on FLINK-4035: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2369 I'm currently working on rebasing the PR and addressing the comments. > Bump Kafka producer in Kafka sink to Kafka 0.10.0.0 > --- > > Key: FLINK-4035 > URL: https://issues.apache.org/jira/browse/FLINK-4035 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Elias Levy >Assignee: Robert Metzger >Priority: Minor > > Kafka 0.10.0.0 introduced protocol changes related to the producer. > Published messages now include timestamps and compressed messages now include > relative offsets. As it is now, brokers must decompress publisher compressed > messages, assign offset to them, and recompress them, which is wasteful and > makes it less likely that compression will be used at all. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4684) Remove obsolete classloader from CheckpointCoordinator
Stephan Ewen created FLINK-4684: --- Summary: Remove obsolete classloader from CheckpointCoordinator Key: FLINK-4684 URL: https://issues.apache.org/jira/browse/FLINK-4684 Project: Flink Issue Type: Improvement Components: State Backends, Checkpointing Affects Versions: 1.2.0 Reporter: Stephan Ewen Assignee: Stephan Ewen Priority: Minor Fix For: 1.2.0 With the latest checkpointing changes, the {{CheckpointCoordinator}} should not execute user code any more, and this not use a User Code ClassLoader any more. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker
[ https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15523328#comment-15523328 ] ASF GitHub Bot commented on FLINK-4280: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2509 Hi @gyfora, Yes, it is absolutely possible to add that. There's actually a JIRA for that feature too ([FLINK-3123](https://issues.apache.org/jira/browse/FLINK-3123)), so I'd say we can add that feature on top of the proposed changes here, as a separate follow up PR after this one? One note though, the API for that feature would need to be able to specify offsets for partitions of different topics, since the Kafka consumers can subscribe multiple topics. So, `Map` wouldn't fit this case, probably would be better off having a new user-facing class as the argument to define the offsets. > New Flink-specific option to set starting position of Kafka consumer without > respecting external offsets in ZK / Broker > --- > > Key: FLINK-4280 > URL: https://issues.apache.org/jira/browse/FLINK-4280 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0 > > > Currently, to start reading from the "earliest" and "latest" position in > topics for the Flink Kafka consumer, users set the Kafka config > {{auto.offset.reset}} in the provided properties configuration. > However, the way this config actually works might be a bit misleading if > users were trying to find a way to "read topics from a starting position". > The way the {{auto.offset.reset}} config works in the Flink Kafka consumer > resembles Kafka's original intent for the setting: first, existing external > offsets committed to the ZK / brokers will be checked; if none exists, then > will {{auto.offset.reset}} be respected. > I propose to add Flink-specific ways to define the starting position, without > taking into account the external offsets. The original behaviour (reference > external offsets first) can be changed to be a user option, so that the > behaviour can be retained for frequent Kafka users that may need some > collaboration with existing non-Flink Kafka consumer applications. > How users will interact with the Flink Kafka consumer after this is added, > with a newly introduced {{flink.starting-position}} config: > {code} > Properties props = new Properties(); > props.setProperty("flink.starting-position", "earliest/latest"); > props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a > warning) > props.setProperty("group.id", "...") // this won't have effect on the > starting position anymore (may still be used in external offset committing) > ... > {code} > Or, reference external offsets in ZK / broker: > {code} > Properties props = new Properties(); > props.setProperty("flink.starting-position", "external-offsets"); > props.setProperty("auto.offset.reset", "earliest/latest"); // default will be > latest > props.setProperty("group.id", "..."); // will be used to lookup external > offsets in ZK / broker on startup > ... > {code} > A thing we would need to decide on is what would the default value be for > {{flink.starting-position}}. > Two merits I see in adding this: > 1. This compensates the way users generally interpret "read from a starting > position". As the Flink Kafka connector is somewhat essentially a > "high-level" Kafka consumer for Flink users, I think it is reasonable to add > Flink-specific functionality that users will find useful, although it wasn't > supported in Kafka's original consumer designs. > 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is > used only to expose progress to the outside world, and not used to manipulate > how Kafka topics are read in Flink (unless users opt to do so)" is even more > definite and solid. There was some discussion in this PR > (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I > think adding this "decouples" more Flink's internal offset checkpointing from > the external Kafka's offset store. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
[ https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15523327#comment-15523327 ] ASF GitHub Bot commented on FLINK-4035: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2369#discussion_r80496915 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java --- @@ -60,12 +60,17 @@ protected void assignPartitionsToConsumer(KafkaConsumer consumer consumer.assign(topicPartitions); } + @Override + protected void emitRecord(T record, KafkaTopicPartitionState partition, long offset, ConsumerRecord consumerRecord) throws Exception { + // pass timestamp + super.emitRecord(record, partition, offset, consumerRecord.timestamp()); + } + /** * Emit record Kafka-timestamp aware. */ @Override - protected void emitRecord(T record, KafkaTopicPartitionState partitionState, long offset, R kafkaRecord) throws Exception { - long timestamp = ((ConsumerRecord) kafkaRecord).timestamp(); + protected void emitRecord(T record, KafkaTopicPartitionState partitionState, long offset, long timestamp) throws Exception { if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) { --- End diff -- Yes, I'll do that. > Bump Kafka producer in Kafka sink to Kafka 0.10.0.0 > --- > > Key: FLINK-4035 > URL: https://issues.apache.org/jira/browse/FLINK-4035 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Elias Levy >Assignee: Robert Metzger >Priority: Minor > > Kafka 0.10.0.0 introduced protocol changes related to the producer. > Published messages now include timestamps and compressed messages now include > relative offsets. As it is now, brokers must decompress publisher compressed > messages, assign offset to them, and recompress them, which is wasteful and > makes it less likely that compression will be used at all. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2509: [FLINK-4280][kafka-connector] Explicit start position con...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2509 Hi @gyfora, Yes, it is absolutely possible to add that. There's actually a JIRA for that feature too ([FLINK-3123](https://issues.apache.org/jira/browse/FLINK-3123)), so I'd say we can add that feature on top of the proposed changes here, as a separate follow up PR after this one? One note though, the API for that feature would need to be able to specify offsets for partitions of different topics, since the Kafka consumers can subscribe multiple topics. So, `Map` wouldn't fit this case, probably would be better off having a new user-facing class as the argument to define the offsets. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2369: [FLINK-4035] Add a streaming connector for Apache ...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2369#discussion_r80491475 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java --- @@ -0,0 +1,399 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.functions.IterationRuntimeContext; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.apache.kafka.clients.producer.ProducerRecord; + +import java.util.Properties; + +import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList; + + +/** + * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.10.x + * + * Implementation note: This producer is a hybrid between a regular regular sink function (a) + * and a custom operator (b). + * + * For (a), the class implements the SinkFunction and RichFunction interfaces. + * For (b), it extends the StreamTask class. + * + * Details about approach (a): + * + * Pre Kafka 0.10 producers only follow approach (a), allowing users to use the producer using the + * DataStream.addSink() method. + * Since the APIs exposed in that variant do not allow accessing the the timestamp attached to the record + * the Kafka 0.10 producer has a section invocation option, approach (b). + * + * Details about approach (b): + * Kafka 0.10 supports writing the timestamp attached to a record to Kafka. When adding the + * FlinkKafkaProducer010 using the FlinkKafkaProducer010.writeToKafka() method, the Kafka producer + * can access the internal record timestamp of the record and write it to Kafka. + * + * All methods and constructors in this class are marked with the approach they are needed for. + */ +public class FlinkKafkaProducer010 extends StreamSink implements SinkFunction, RichFunction { + + /** +* Flag controlling whether we are writing the Flink record's timestamp into Kafka. +*/ + private boolean writeTimestampToKafka = false; + + // -- "Constructors" for timestamp writing -- + + /** +* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to +* the topic. +* +* This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) +* +* @param inStream The stream to write to Kafka +* @param topicId ID of the Kafka topic. +* @param serializationSchema User defined serialization schema supporting key/value messages +* @param producerConfig Properties with the producer configuration. +*/ + public static FlinkKafkaProducer010Configuration writeToKafka(DataStream inStream, +