[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-17 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r426343289



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
##
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.Keys;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import java.util.Properties;
+
+/**
+ * {@link FlinkKafkaShuffle} uses Kafka as a message bus to shuffle and 
persist data at the same time.
+ *
+ * Persisting shuffle data is useful when
+ * - you would like to reuse the shuffle data and/or,
+ * - you would like to avoid a full restart of a pipeline during failure 
recovery
+ *
+ * Persisting shuffle is achieved by wrapping a {@link 
FlinkKafkaShuffleProducer} and
+ * a {@link FlinkKafkaShuffleConsumer} together into a {@link 
FlinkKafkaShuffle}.
+ * Here is an example how to use a {@link FlinkKafkaShuffle}.
+ *
+ * {@code
+ * StreamExecutionEnvironment env = ...
// create execution environment
+ * DataStream source = env.addSource(...)   
// add data stream source
+ * DataStream dataStream = ...  
// some transformation(s) based on source
+ *
+ * KeyedStream keyedStream = FlinkKafkaShuffle
+ * .persistentKeyBy(   
// keyBy shuffle through kafka
+ * dataStream, 
// data stream to be shuffled
+ * topic,  
// Kafka topic written to
+ * producerParallelism,
// the number of tasks of a Kafka Producer
+ * numberOfPartitions, 
// the number of partitions of the Kafka topic written 
to
+ * kafkaProperties,
// kafka properties for Kafka Producer and Consumer
+ * keySelector);   
// key selector to retrieve key from `dataStream'
+ *
+ * keyedStream.transform...
// some other transformation(s)
+ *
+ * KeyedStream keyedStreamReuse = FlinkKafkaShuffle
+ * .readKeyBy( 
// Read the Kafka shuffle data again for other 
usages
+ * topic,  
// 

[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-17 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r426343364



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
##
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.Keys;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import java.util.Properties;
+
+/**
+ * {@link FlinkKafkaShuffle} uses Kafka as a message bus to shuffle and 
persist data at the same time.
+ *
+ * Persisting shuffle data is useful when
+ * - you would like to reuse the shuffle data and/or,
+ * - you would like to avoid a full restart of a pipeline during failure 
recovery
+ *
+ * Persisting shuffle is achieved by wrapping a {@link 
FlinkKafkaShuffleProducer} and
+ * a {@link FlinkKafkaShuffleConsumer} together into a {@link 
FlinkKafkaShuffle}.
+ * Here is an example how to use a {@link FlinkKafkaShuffle}.
+ *
+ * {@code
+ * StreamExecutionEnvironment env = ...
// create execution environment
+ * DataStream source = env.addSource(...)   
// add data stream source
+ * DataStream dataStream = ...  
// some transformation(s) based on source
+ *
+ * KeyedStream keyedStream = FlinkKafkaShuffle
+ * .persistentKeyBy(   
// keyBy shuffle through kafka
+ * dataStream, 
// data stream to be shuffled
+ * topic,  
// Kafka topic written to
+ * producerParallelism,
// the number of tasks of a Kafka Producer
+ * numberOfPartitions, 
// the number of partitions of the Kafka topic written 
to
+ * kafkaProperties,
// kafka properties for Kafka Producer and Consumer
+ * keySelector);   
// key selector to retrieve key from `dataStream'
+ *
+ * keyedStream.transform...
// some other transformation(s)
+ *
+ * KeyedStream keyedStreamReuse = FlinkKafkaShuffle
+ * .readKeyBy( 
// Read the Kafka shuffle data again for other 
usages
+ * topic,  
// 

[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-16 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r426143357



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleConsumer.java
##
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
+import 
org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+import org.apache.flink.util.SerializedValue;
+
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PRODUCER_PARALLELISM;
+
+/**
+ * Flink Kafka Shuffle Consumer Function.
+ */
+@Internal
+public class FlinkKafkaShuffleConsumer extends FlinkKafkaConsumer {
+   private final TypeSerializer serializer;
+   private final int producerParallelism;
+
+   FlinkKafkaShuffleConsumer(String topic, 
TypeInformationSerializationSchema schema, Properties props) {

Review comment:
   I will mark this resolved for now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-16 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r426143294



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleConsumer.java
##
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
+import 
org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+import org.apache.flink.util.SerializedValue;
+
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PRODUCER_PARALLELISM;
+
+/**
+ * Flink Kafka Shuffle Consumer Function.
+ */
+@Internal
+public class FlinkKafkaShuffleConsumer extends FlinkKafkaConsumer {
+   private final TypeSerializer serializer;
+   private final int producerParallelism;
+
+   FlinkKafkaShuffleConsumer(String topic, 
TypeInformationSerializationSchema schema, Properties props) {

Review comment:
   I guess the main point here is to avoid exposing 
`TypeInformationSerializationSchema` to users. `FlinkKafkaShuffleConsumer` is 
an internal function, and is wrapped under KafkaShuffle. So I guess it is fine 
to keep TypeInformationSerializationSchema it here.
   The schema is needed to call the right FlinkKafkaConsumer constructor.
   The shema is never used, can be `null`, but `null` confuses the compiler.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-16 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r426142494



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaShuffleFetcher.java
##
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITHOUT_TIMESTAMP;
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITH_TIMESTAMP;
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_WATERMARK;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Fetch data from Kafka for Kafka Shuffle.
+ */
+@Internal
+public class KafkaShuffleFetcher extends AbstractFetcher 
{

Review comment:
   removed the duplicated code.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-16 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r426140306



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaShuffleFetcher.java
##
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITHOUT_TIMESTAMP;
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITH_TIMESTAMP;
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_WATERMARK;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Fetch data from Kafka for Kafka Shuffle.
+ */
+@Internal
+public class KafkaShuffleFetcher extends AbstractFetcher 
{
+   private static final Logger LOG = 
LoggerFactory.getLogger(KafkaShuffleFetcher.class);
+
+   /** The handler to check and generate watermarks from fetched records. 
**/
+   private final WatermarkHandler watermarkHandler;
+
+   /** The schema to convert between Kafka's byte messages, and Flink's 
objects. */
+   private final KafkaShuffleElementDeserializer deserializer;
+
+   /** Serializer to serialize record. */
+   private final TypeSerializer serializer;
+
+   /** The handover of data and exceptions between the consumer thread and 
the task thread. */
+   private final Handover handover;
+
+   /** The thread that runs the actual KafkaConsumer and hand the record 
batches to this fetcher. */
+   private final KafkaConsumerThread consumerThread;
+
+   /** Flag to mark the main work loop as alive. */
+   private volatile boolean running = true;
+
+   public KafkaShuffleFetcher(
+   SourceFunction.SourceContext sourceContext,
+   Map 
assignedPartitionsWithInitialOffsets,
+   SerializedValue> 
watermarksPeriodic,
+   SerializedValue> 
watermarksPunctuated,
+   ProcessingTimeService processingTimeProvider,
+   long autoWatermarkInterval,
+   ClassLoader userCodeClassLoader,
+   String taskNameWithSubtasks,
+   TypeSerializer serializer,

[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-16 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r426139686



##
File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
##
@@ -264,7 +264,7 @@ public FlinkKafkaConsumerBase(
 * @param properties - Kafka configuration properties to be adjusted
 * @param offsetCommitMode offset commit mode

Review comment:
   Do you mean remove "-" ?
   I did not change this code?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-16 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r426129684



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaFetcher.java
##
@@ -192,6 +176,29 @@ protected String getFetcherName() {
return "Kafka Fetcher";
}
 
+   protected void partitionConsumerRecordsHandler(
+   List> partitionRecords,
+   KafkaTopicPartitionState partition) 
throws Exception {
+
+   for (ConsumerRecord record : partitionRecords) {
+   deserializer.deserialize(record, kafkaCollector);
+
+   // emit the actual records. this also updates offset 
state atomically and emits
+   // watermarks
+   emitRecordsWithTimestamps(
+   kafkaCollector.getRecords(),
+   partition,
+   record.offset(),
+   record.timestamp());
+
+   if (kafkaCollector.isEndOfStreamSignalled()) {
+   // end of stream signaled
+   running = false;
+   break;
+   }

Review comment:
   > Much better now, but I would have just extracted this inside of the 
for loop, since the loop itself is also the same.
   
   It would be a bit tricky because it has a for-loop break; I have to change 
the logic inside. At this time, I would prefer to make more safe changes...





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-15 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r425771337



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaShuffleFetcher.java
##
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITHOUT_TIMESTAMP;
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITH_TIMESTAMP;
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_WATERMARK;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Fetch data from Kafka for Kafka Shuffle.
+ */
+@Internal
+public class KafkaShuffleFetcher extends AbstractFetcher 
{
+   private static final Logger LOG = 
LoggerFactory.getLogger(KafkaShuffleFetcher.class);
+
+   private final WatermarkHandler watermarkHandler;
+   // 

+
+   /** The schema to convert between Kafka's byte messages, and Flink's 
objects. */
+   private final KafkaShuffleElementDeserializer deserializer;
+
+   /** Serializer to serialize record. */
+   private final TypeSerializer serializer;
+
+   /** The handover of data and exceptions between the consumer thread and 
the task thread. */
+   private final Handover handover;
+
+   /** The thread that runs the actual KafkaConsumer and hand the record 
batches to this fetcher. */
+   private final KafkaConsumerThread consumerThread;
+
+   /** Flag to mark the main work loop as alive. */
+   private volatile boolean running = true;
+
+   public KafkaShuffleFetcher(
+   SourceFunction.SourceContext sourceContext,
+   Map 
assignedPartitionsWithInitialOffsets,
+   SerializedValue> 
watermarksPeriodic,
+   SerializedValue> 
watermarksPunctuated,
+   ProcessingTimeService processingTimeProvider,
+   long autoWatermarkInterval,
+   ClassLoader userCodeClassLoader,
+   String taskNameWithSubtasks,
+   TypeSerializer serializer,

[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-15 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r425771552



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaShuffleFetcher.java
##
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITHOUT_TIMESTAMP;
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITH_TIMESTAMP;
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_WATERMARK;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Fetch data from Kafka for Kafka Shuffle.
+ */
+@Internal
+public class KafkaShuffleFetcher extends AbstractFetcher 
{

Review comment:
   me answer why.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-15 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r425767456



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleProducer.java
##
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaException;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PARTITION_NUMBER;
+
+/**
+ * Flink Kafka Shuffle Producer Function.
+ * It is different from {@link FlinkKafkaProducer} in the way handling 
elements and watermarks
+ */
+@Internal
+public class FlinkKafkaShuffleProducer extends FlinkKafkaProducer 
{
+   private final KafkaSerializer kafkaSerializer;
+   private final KeySelector keySelector;
+   private final int numberOfPartitions;
+
+   FlinkKafkaShuffleProducer(
+   String defaultTopicId,
+   TypeInformationSerializationSchema schema,
+   Properties props,
+   KeySelector keySelector,
+   Semantic semantic,
+   int kafkaProducersPoolSize) {
+   super(defaultTopicId, (element, timestamp) -> null, props, 
semantic, kafkaProducersPoolSize);
+
+   this.kafkaSerializer = new 
KafkaSerializer<>(schema.getSerializer());
+   this.keySelector = keySelector;
+
+   Preconditions.checkArgument(
+   props.getProperty(PARTITION_NUMBER) != null,
+   "Missing partition number for Kafka Shuffle");
+   numberOfPartitions = PropertiesUtil.getInt(props, 
PARTITION_NUMBER, Integer.MIN_VALUE);
+   }
+
+   /**
+* This is the function invoked to handle each element.
+* @param transaction transaction state;
+*elements are written to Kafka in transactions to 
guarantee different level of data consistency
+* @param next element to handle
+* @param context context needed to handle the element
+* @throws FlinkKafkaException for kafka error
+*/
+   @Override
+   public void invoke(KafkaTransactionState transaction, IN next, Context 
context) throws FlinkKafkaException {
+   checkErroneous();
+
+   // write timestamp to Kafka if timestamp is available
+   Long timestamp = context.timestamp();
+
+   int[] partitions = getPartitions(transaction);
+   int partitionIndex;
+   try {
+   partitionIndex = KeyGroupRangeAssignment
+   
.assignKeyToParallelOperator(keySelector.getKey(next), partitions.length, 
partitions.length);
+   } catch (Exception e) {
+   throw new RuntimeException("Fail to assign a partition 
number to record");
+   }
+
+   ProducerRecord record = new ProducerRecord<>(
+   defaultTopicId, partitionIndex, timestamp, null, 
kafkaSerializer.serializeRecord(next, timestamp));
+   pendingRecords.incrementAndGet();
+   transaction.getProducer().send(record, callback);
+   }
+
+   /**
+* This is the function invoked to handle each watermark.
+* 

[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-15 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r425731246



##
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleTestBase.java
##
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase;
+import org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase;
+import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.util.Collector;
+
+import org.junit.BeforeClass;
+
+import static org.apache.flink.streaming.api.TimeCharacteristic.EventTime;
+
+/**
+ * Base Test Class for KafkaShuffle.
+ */
+public class KafkaShuffleTestBase extends KafkaConsumerTestBase {
+   static final long INIT_TIMESTAMP = System.currentTimeMillis();

Review comment:
   why it is easier if not a real timestamp? I can change it back to a 
static number, but feeling this might be more realistic.
   
   This change is originally motivated by debugging the Kafka Server. I thought 
some data is trimmed because of the timestamp is too old. But it is not related.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-15 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r425608617



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaShuffleFetcher.java
##
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITHOUT_TIMESTAMP;
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITH_TIMESTAMP;
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_WATERMARK;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Fetch data from Kafka for Kafka Shuffle.
+ */
+@Internal
+public class KafkaShuffleFetcher extends AbstractFetcher 
{
+   private static final Logger LOG = 
LoggerFactory.getLogger(KafkaShuffleFetcher.class);
+
+   private final WatermarkHandler watermarkHandler;
+   // 

+
+   /** The schema to convert between Kafka's byte messages, and Flink's 
objects. */
+   private final KafkaShuffleElementDeserializer deserializer;
+
+   /** Serializer to serialize record. */
+   private final TypeSerializer serializer;
+
+   /** The handover of data and exceptions between the consumer thread and 
the task thread. */
+   private final Handover handover;
+
+   /** The thread that runs the actual KafkaConsumer and hand the record 
batches to this fetcher. */
+   private final KafkaConsumerThread consumerThread;
+
+   /** Flag to mark the main work loop as alive. */
+   private volatile boolean running = true;
+
+   public KafkaShuffleFetcher(
+   SourceFunction.SourceContext sourceContext,
+   Map 
assignedPartitionsWithInitialOffsets,
+   SerializedValue> 
watermarksPeriodic,
+   SerializedValue> 
watermarksPunctuated,
+   ProcessingTimeService processingTimeProvider,
+   long autoWatermarkInterval,
+   ClassLoader userCodeClassLoader,
+   String taskNameWithSubtasks,
+   TypeSerializer serializer,

[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-14 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r425572456



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
##
@@ -35,9 +35,9 @@
 
private final SinkTransformation transformation;
 
-   @SuppressWarnings("unchecked")
protected DataStreamSink(DataStream inputStream, StreamSink 
operator) {
-   this.transformation = new 
SinkTransformation(inputStream.getTransformation(), "Unnamed", operator, 
inputStream.getExecutionEnvironment().getParallelism());

Review comment:
   I will revert it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-14 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r425571792



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/StreamKafkaShuffleSink.java
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+/**
+ * A customized {@link StreamOperator} for executing {@link 
FlinkKafkaShuffleProducer} that handle
+ * both elements and watermarks. If the shuffle sink is determined to be 
useful to other sinks in the future,
+ * we should abstract this operator to data stream api. For now, we keep the 
operator this way to avoid
+ * public interface change.
+ */
+@Internal
+class StreamKafkaShuffleSink extends StreamSink {
+
+   public StreamKafkaShuffleSink(FlinkKafkaShuffleProducer 
flinkKafkaShuffleProducer) {
+   super(flinkKafkaShuffleProducer);
+   }
+
+   @Override
+   public void processWatermark(Watermark mark) throws Exception {
+   super.processWatermark(mark);
+   this.currentWatermark = mark.getTimestamp();

Review comment:
   That's a good catch! Thanks!!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-14 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r425570222



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
##
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.Keys;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import java.util.Properties;
+
+/**
+ * Use Kafka as a persistent shuffle by wrapping a Kafka Source/Sink pair 
together.
+ */
+@Experimental
+class FlinkKafkaShuffle {
+   static final String PRODUCER_PARALLELISM = "producer parallelism";
+   static final String PARTITION_NUMBER = "partition number";
+
+   /**
+* Write to and read from a kafka shuffle with the partition decided by 
keys.
+* Consumers should read partitions equal to the key group indices they 
are assigned.
+* The number of partitions is the maximum parallelism of the receiving 
operator.
+* This version only supports numberOfPartitions = consumerParallelism.

Review comment:
   Do you mean specifically saying "producerParallelism != 
numberOfPartitions?"
   
   If think this is keyBy through a persistent channel, the producer 
parallelism does not matter with the max key group size? I kind of feeling 
confusing if adding such a comment.
   
   For config changes, yes, probably in the next step; Overall, this is only a 
very first version, allowing users to set parallelism when writing pipelines 
might be enough. 
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-14 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r425560889



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleConsumer.java
##
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
+import 
org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+import org.apache.flink.util.SerializedValue;
+
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PRODUCER_PARALLELISM;
+
+/**
+ * Flink Kafka Shuffle Consumer Function.
+ */
+@Internal
+public class FlinkKafkaShuffleConsumer extends FlinkKafkaConsumer {
+   private final TypeSerializer serializer;
+   private final int producerParallelism;
+
+   FlinkKafkaShuffleConsumer(String topic, 
TypeInformationSerializationSchema schema, Properties props) {

Review comment:
   That's not easy, because I have to call the super constructor of 
FlinkKafkaConsumer.
   
   Schema is needed for the constructor of FlinkKafkaConsumer, and it can not 
be null even if it is not needed (not sure)
   
   `super(topic, schema, props);`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-14 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r425560889



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleConsumer.java
##
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
+import 
org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+import org.apache.flink.util.SerializedValue;
+
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PRODUCER_PARALLELISM;
+
+/**
+ * Flink Kafka Shuffle Consumer Function.
+ */
+@Internal
+public class FlinkKafkaShuffleConsumer extends FlinkKafkaConsumer {
+   private final TypeSerializer serializer;
+   private final int producerParallelism;
+
+   FlinkKafkaShuffleConsumer(String topic, 
TypeInformationSerializationSchema schema, Properties props) {

Review comment:
   That's not easy, because I have to call the super constructor of 
FlinkKafkaConsumer.
   
   Schema is needed for the constructor of FlinkKafkaConsumer
   
   super(topic, schema, props);





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-14 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r425560889



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleConsumer.java
##
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
+import 
org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+import org.apache.flink.util.SerializedValue;
+
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PRODUCER_PARALLELISM;
+
+/**
+ * Flink Kafka Shuffle Consumer Function.
+ */
+@Internal
+public class FlinkKafkaShuffleConsumer extends FlinkKafkaConsumer {
+   private final TypeSerializer serializer;
+   private final int producerParallelism;
+
+   FlinkKafkaShuffleConsumer(String topic, 
TypeInformationSerializationSchema schema, Properties props) {

Review comment:
   That's not easy, because I have to call the super constructor of 
FlinkKafkaConsumer.
   
   Schema is needed for the constructor of FlinkKafkaConsumer, and it can not 
be null even if it is not needed (not sure)
   
   super(topic, schema, props);





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-14 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r425559040



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaShuffleFetcher.java
##
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITHOUT_TIMESTAMP;
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITH_TIMESTAMP;
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_WATERMARK;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Fetch data from Kafka for Kafka Shuffle.
+ */
+@Internal
+public class KafkaShuffleFetcher extends AbstractFetcher 
{

Review comment:
   That's a good question. 
   
   It is because what I really override is fetcher's method (how fetcher fetch 
records, deserialize records and emit watermarks). That's why I have to 
subclass Fetcher anyway.
   
   I tried different ways, there is one way to avoid duplicated code, but needs 
to add a default constructor in one of consumer or fetcher base classes.
   
   I think it is unsafe to do it, and that's why it is ended up like this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-14 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r42309



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleProducer.java
##
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaException;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PARTITION_NUMBER;
+
+/**
+ * Flink Kafka Shuffle Producer Function.
+ * It is different from {@link FlinkKafkaProducer} in the way handling 
elements and watermarks
+ */
+@Internal
+public class FlinkKafkaShuffleProducer extends FlinkKafkaProducer 
{
+   private final KafkaSerializer kafkaSerializer;
+   private final KeySelector keySelector;
+   private final int numberOfPartitions;
+
+   FlinkKafkaShuffleProducer(
+   String defaultTopicId,
+   TypeInformationSerializationSchema schema,
+   Properties props,
+   KeySelector keySelector,
+   Semantic semantic,
+   int kafkaProducersPoolSize) {
+   super(defaultTopicId, (element, timestamp) -> null, props, 
semantic, kafkaProducersPoolSize);
+
+   this.kafkaSerializer = new 
KafkaSerializer<>(schema.getSerializer());
+   this.keySelector = keySelector;
+
+   Preconditions.checkArgument(
+   props.getProperty(PARTITION_NUMBER) != null,
+   "Missing partition number for Kafka Shuffle");
+   numberOfPartitions = PropertiesUtil.getInt(props, 
PARTITION_NUMBER, Integer.MIN_VALUE);
+   }
+
+   /**
+* This is the function invoked to handle each element.
+* @param transaction transaction state;
+*elements are written to Kafka in transactions to 
guarantee different level of data consistency
+* @param next element to handle
+* @param context context needed to handle the element
+* @throws FlinkKafkaException for kafka error
+*/
+   @Override
+   public void invoke(KafkaTransactionState transaction, IN next, Context 
context) throws FlinkKafkaException {
+   checkErroneous();
+
+   // write timestamp to Kafka if timestamp is available
+   Long timestamp = context.timestamp();
+
+   int[] partitions = getPartitions(transaction);
+   int partitionIndex;
+   try {
+   partitionIndex = KeyGroupRangeAssignment
+   
.assignKeyToParallelOperator(keySelector.getKey(next), partitions.length, 
partitions.length);
+   } catch (Exception e) {
+   throw new RuntimeException("Fail to assign a partition 
number to record");
+   }
+
+   ProducerRecord record = new ProducerRecord<>(
+   defaultTopicId, partitionIndex, timestamp, null, 
kafkaSerializer.serializeRecord(next, timestamp));
+   pendingRecords.incrementAndGet();
+   transaction.getProducer().send(record, callback);
+   }
+
+   /**
+* This is the function invoked to handle each watermark.
+* 

[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-14 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r425554722



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleProducer.java
##
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaException;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PARTITION_NUMBER;
+
+/**
+ * Flink Kafka Shuffle Producer Function.
+ * It is different from {@link FlinkKafkaProducer} in the way handling 
elements and watermarks
+ */
+@Internal
+public class FlinkKafkaShuffleProducer extends FlinkKafkaProducer 
{
+   private final KafkaSerializer kafkaSerializer;
+   private final KeySelector keySelector;
+   private final int numberOfPartitions;
+
+   FlinkKafkaShuffleProducer(
+   String defaultTopicId,
+   TypeInformationSerializationSchema schema,
+   Properties props,
+   KeySelector keySelector,
+   Semantic semantic,
+   int kafkaProducersPoolSize) {
+   super(defaultTopicId, (element, timestamp) -> null, props, 
semantic, kafkaProducersPoolSize);
+
+   this.kafkaSerializer = new 
KafkaSerializer<>(schema.getSerializer());
+   this.keySelector = keySelector;
+
+   Preconditions.checkArgument(
+   props.getProperty(PARTITION_NUMBER) != null,
+   "Missing partition number for Kafka Shuffle");
+   numberOfPartitions = PropertiesUtil.getInt(props, 
PARTITION_NUMBER, Integer.MIN_VALUE);
+   }
+
+   /**
+* This is the function invoked to handle each element.
+* @param transaction transaction state;
+*elements are written to Kafka in transactions to 
guarantee different level of data consistency
+* @param next element to handle
+* @param context context needed to handle the element
+* @throws FlinkKafkaException for kafka error
+*/
+   @Override
+   public void invoke(KafkaTransactionState transaction, IN next, Context 
context) throws FlinkKafkaException {
+   checkErroneous();
+
+   // write timestamp to Kafka if timestamp is available
+   Long timestamp = context.timestamp();
+
+   int[] partitions = getPartitions(transaction);
+   int partitionIndex;
+   try {
+   partitionIndex = KeyGroupRangeAssignment
+   
.assignKeyToParallelOperator(keySelector.getKey(next), partitions.length, 
partitions.length);
+   } catch (Exception e) {
+   throw new RuntimeException("Fail to assign a partition 
number to record");
+   }
+
+   ProducerRecord record = new ProducerRecord<>(
+   defaultTopicId, partitionIndex, timestamp, null, 
kafkaSerializer.serializeRecord(next, timestamp));

Review comment:
   OK, I will do another pass. 





This is an automated message from the Apache Git Service.
To respond to the message, please log 

[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-14 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r425552869



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
##
@@ -210,7 +210,7 @@
/**
 * The name of the default topic this producer is writing data to.
 */
-   private final String defaultTopicId;
+   protected final String defaultTopicId;

Review comment:
   It was fine to use "package-private" previously, but I have moved all 
the KafkaShuffle related classes to a separate package. So, it is not 
sufficient any more.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-14 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r425540650



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
##
@@ -210,7 +210,7 @@
/**

Review comment:
   I like "by heart", hahaha





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-14 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r425539972



##
File path: flink-core/src/main/java/org/apache/flink/util/PropertiesUtil.java
##
@@ -108,6 +109,27 @@ public static boolean getBoolean(Properties config, String 
key, boolean defaultV
}
}
 
+   /**
+* Flatten a recursive {@link Properties} to a first level property map.
+* In some cases, {KafkaProducer#propsToMap} for example, Properties is 
used purely as a HashTable

Review comment:
   Yep, that's why I removed the link :-)
   
   Thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-14 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r425044068



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleProducer.java
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaException;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PARTITION_NUMBER;
+
+/**
+ * Flink Kafka Shuffle Producer Function.
+ * It is different from {@link FlinkKafkaProducer} in the way handling 
elements and watermarks
+ */
+@Internal
+public class FlinkKafkaShuffleProducer extends FlinkKafkaProducer 
{
+   private final KafkaSerializer kafkaSerializer;
+   private final KeySelector keySelector;
+   private final int numberOfPartitions;
+
+   FlinkKafkaShuffleProducer(
+   String defaultTopicId,
+   TypeInformationSerializationSchema schema,
+   Properties props,
+   KeySelector keySelector,
+   Semantic semantic,
+   int kafkaProducersPoolSize) {
+   super(defaultTopicId, (element, timestamp) -> null, props, 
semantic, kafkaProducersPoolSize);
+
+   this.kafkaSerializer = new 
KafkaSerializer<>(schema.getSerializer());
+   this.keySelector = keySelector;
+
+   Preconditions.checkArgument(
+   props.getProperty(PARTITION_NUMBER) != null,
+   "Missing partition number for Kafka Shuffle");
+   numberOfPartitions = PropertiesUtil.getInt(props, 
PARTITION_NUMBER, Integer.MIN_VALUE);
+   }
+
+   /**
+* This is the function invoked to handle each element.
+* @param transaction transaction state;
+*elements are written to Kafka in transactions to 
guarantee different level of data consistency
+* @param next element to handle
+* @param context context needed to handle the element
+* @throws FlinkKafkaException for kafka error
+*/
+   @Override
+   public void invoke(KafkaTransactionState transaction, IN next, Context 
context) throws FlinkKafkaException {
+   checkErroneous();
+
+   // write timestamp to Kafka if timestamp is available
+   Long timestamp = context.timestamp();
+
+   int[] partitions = getPartitions(transaction);
+   int partitionIndex;
+   try {
+   partitionIndex = KeyGroupRangeAssignment
+   
.assignKeyToParallelOperator(keySelector.getKey(next), partitions.length, 
partitions.length);
+   } catch (Exception e) {
+   throw new RuntimeException("Fail to assign a partition 
number to record");
+   }
+
+   ProducerRecord record = new ProducerRecord<>(
+   defaultTopicId, partitionIndex, timestamp, null, 
kafkaSerializer.serializeRecord(next, timestamp));
+   pendingRecords.incrementAndGet();
+   transaction.getProducer().send(record, callback);
+   }
+
+   /**
+* This is the function invoked to handle each watermark.
+* @param transaction transaction state;
+*  

[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-07 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r421912378



##
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java
##
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase;
+import org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase;
+import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.util.Collector;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.apache.flink.streaming.api.TimeCharacteristic.EventTime;
+import static org.apache.flink.streaming.api.TimeCharacteristic.IngestionTime;
+import static org.apache.flink.streaming.api.TimeCharacteristic.ProcessingTime;
+import static org.apache.flink.test.util.TestUtils.tryExecute;
+
+/**
+ * Simple End to End Test for Kafka.
+ */
+public class KafkaShuffleITCase extends KafkaConsumerTestBase {
+
+   @BeforeClass
+   public static void prepare() throws Exception {
+   KafkaProducerTestBase.prepare();
+   ((KafkaTestEnvironmentImpl) 
kafkaServer).setProducerSemantic(FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
+   }
+
+   /**
+* Producer Parallelism = 1; Kafka Partition # = 1; Consumer 
Parallelism = 1.
+* To test no data is lost or duplicated end-2-end with the default 
time characteristic: ProcessingTime
+*/
+   @Test(timeout = 3L)

Review comment:
   that's a good one :-)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-01 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r418596170



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaShuffleProducer.java
##
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaShuffle.PARTITION_NUMBER;
+
+/**
+ * Flink Kafka Shuffle Producer Function.
+ * It is different from {@link FlinkKafkaProducer} in the way handling 
elements and watermarks
+ */
+@Internal
+public class FlinkKafkaShuffleProducer extends FlinkKafkaProducer 
{
+   private final KafkaSerializer kafkaSerializer;
+   private final KeySelector keySelector;
+   private final int numberOfPartitions;
+
+   FlinkKafkaShuffleProducer(
+   String defaultTopicId,
+   TypeInformationSerializationSchema schema,
+   Properties props,
+   KeySelector keySelector,
+   Semantic semantic,
+   int kafkaProducersPoolSize) {
+   super(defaultTopicId, (element, timestamp) -> null, props, 
semantic, kafkaProducersPoolSize);
+
+   this.kafkaSerializer = new 
KafkaSerializer<>(schema.getSerializer());
+   this.keySelector = keySelector;
+
+   Preconditions.checkArgument(
+   props.getProperty(PARTITION_NUMBER) != null,
+   "Missing partition number for Kafka Shuffle");
+   numberOfPartitions = PropertiesUtil.getInt(props, 
PARTITION_NUMBER, Integer.MIN_VALUE);
+   }
+
+   /**
+* This is the function invoked to handle each element.
+* @param transaction transaction state;
+*elements are written to Kafka in transactions to 
guarantee different level of data consistency
+* @param next element to handle
+* @param context context needed to handle the element
+* @throws FlinkKafkaException
+*/
+   @Override
+   public void invoke(KafkaTransactionState transaction, IN next, Context 
context) throws FlinkKafkaException {
+   checkErroneous();
+
+   // write timestamp to Kafka if timestamp is available
+   Long timestamp = context.timestamp();
+
+   int[] partitions = getPartitions(transaction);
+   int partitionIndex;
+   try {
+   partitionIndex = KeyGroupRangeAssignment
+   
.assignKeyToParallelOperator(keySelector.getKey(next), partitions.length, 
partitions.length);
+   } catch (Exception e) {
+   throw new RuntimeException("Fail to assign a partition 
number to record");
+   }
+
+   ProducerRecord record = new ProducerRecord<>(
+   defaultTopicId, partitionIndex, timestamp, null, 
kafkaSerializer.serializeRecord(next, timestamp));
+   pendingRecords.incrementAndGet();
+   transaction.producer.send(record, callback);
+   }
+
+   /**
+* This is the function invoked to handle each watermark.
+* @param transaction transaction state;
+*watermark are written to Kafka (if needed) in 
transactions
+* @param watermark watermark to handle
+* @throws FlinkKafkaException
+*/
+   

[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-01 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r418596170



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaShuffleProducer.java
##
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaShuffle.PARTITION_NUMBER;
+
+/**
+ * Flink Kafka Shuffle Producer Function.
+ * It is different from {@link FlinkKafkaProducer} in the way handling 
elements and watermarks
+ */
+@Internal
+public class FlinkKafkaShuffleProducer extends FlinkKafkaProducer 
{
+   private final KafkaSerializer kafkaSerializer;
+   private final KeySelector keySelector;
+   private final int numberOfPartitions;
+
+   FlinkKafkaShuffleProducer(
+   String defaultTopicId,
+   TypeInformationSerializationSchema schema,
+   Properties props,
+   KeySelector keySelector,
+   Semantic semantic,
+   int kafkaProducersPoolSize) {
+   super(defaultTopicId, (element, timestamp) -> null, props, 
semantic, kafkaProducersPoolSize);
+
+   this.kafkaSerializer = new 
KafkaSerializer<>(schema.getSerializer());
+   this.keySelector = keySelector;
+
+   Preconditions.checkArgument(
+   props.getProperty(PARTITION_NUMBER) != null,
+   "Missing partition number for Kafka Shuffle");
+   numberOfPartitions = PropertiesUtil.getInt(props, 
PARTITION_NUMBER, Integer.MIN_VALUE);
+   }
+
+   /**
+* This is the function invoked to handle each element.
+* @param transaction transaction state;
+*elements are written to Kafka in transactions to 
guarantee different level of data consistency
+* @param next element to handle
+* @param context context needed to handle the element
+* @throws FlinkKafkaException
+*/
+   @Override
+   public void invoke(KafkaTransactionState transaction, IN next, Context 
context) throws FlinkKafkaException {
+   checkErroneous();
+
+   // write timestamp to Kafka if timestamp is available
+   Long timestamp = context.timestamp();
+
+   int[] partitions = getPartitions(transaction);
+   int partitionIndex;
+   try {
+   partitionIndex = KeyGroupRangeAssignment
+   
.assignKeyToParallelOperator(keySelector.getKey(next), partitions.length, 
partitions.length);
+   } catch (Exception e) {
+   throw new RuntimeException("Fail to assign a partition 
number to record");
+   }
+
+   ProducerRecord record = new ProducerRecord<>(
+   defaultTopicId, partitionIndex, timestamp, null, 
kafkaSerializer.serializeRecord(next, timestamp));
+   pendingRecords.incrementAndGet();
+   transaction.producer.send(record, callback);
+   }
+
+   /**
+* This is the function invoked to handle each watermark.
+* @param transaction transaction state;
+*watermark are written to Kafka (if needed) in 
transactions
+* @param watermark watermark to handle
+* @throws FlinkKafkaException
+*/
+   

[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-01 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r418595393



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
##
@@ -1333,6 +1334,32 @@ public ExecutionConfig getExecutionConfig() {
return sink;
}
 
+   /**
+* Adds a {@link StreamShuffleSink} to this DataStream. {@link 
StreamShuffleSink} is attached with
+* {@link SinkFunction} that can manipulate watermarks.
+*
+* @param sinkFunction
+*  The object containing the sink's invoke 
function for both the element and watermark.
+* @return  The closed DataStream.
+*/
+   public DataStreamSink addSinkShuffle(SinkFunction sinkFunction) {

Review comment:
   Mark as resolved since the entire API is re-organized.
   
   This method is wrapped inside `FlinkKafkaShuffle`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-01 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r418595542



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java
##
@@ -67,6 +68,22 @@ public SinkTransformation(
this(input, name, SimpleOperatorFactory.of(operator), 
parallelism);
}
 
+   /**
+* Creates a new {@code SinkTransformation} from the given input {@code 
Transformation}.
+*
+* @param input The input {@code Transformation}
+* @param name The name of the {@code Transformation}, this will be 
shown in Visualizations and the Log
+* @param operator The sink shuffle operator
+* @param parallelism The parallelism of this {@code SinkTransformation}
+*/
+   public SinkTransformation(

Review comment:
   Mark as resolved since the entire API is re-organized.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-01 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r418544339



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
##
@@ -52,6 +54,20 @@ default void invoke(IN value, Context context) throws 
Exception {
invoke(value);
}
 
+   /**
+* This function is called for every watermark.
+*
+* You have to override this method when implementing a {@code 
SinkFunction} to handle watermark.
+* This method has to be used together with {@link StreamShuffleSink}
+*
+* @param watermark The watermark to handle.
+* @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+*   to fail and may trigger recovery.
+*/
+   default void invoke(Watermark watermark) throws Exception {

Review comment:
   Mark as resolved since the entire API is re-organized.
   
   SinkFunction is untouched in the new version.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-01 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r418544086



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamShuffleSink.java
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+
+/**
+ * A {@link StreamOperator} for executing {@link SinkFunction} that handle 
both elements and watermarks.
+ *
+ * @param 
+ */
+@Internal
+public class StreamShuffleSink extends AbstractUdfStreamOperator>

Review comment:
   Mark this as resolved since the entire API part has been re-organized. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-01 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r418543715



##
File path: flink-core/src/main/java/org/apache/flink/util/PropertiesUtil.java
##
@@ -19,6 +19,7 @@
 

Review comment:
   Oh, yep, used as a HashTable, translated to a HashMap. Thanks :-)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-01 Thread GitBox


curcur commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r418541284



##
File path: flink-core/src/main/java/org/apache/flink/util/PropertiesUtil.java
##
@@ -108,6 +109,27 @@ public static boolean getBoolean(Properties config, String 
key, boolean defaultV
}
}
 
+   /**
+* Flatten a recursive {@link Properties} to a first level property map.
+* In some cases, {KafkaProducer#propsToMap} for example, Properties is 
used purely as a HashMap
+* without considering its default properties.
+*
+* @param config Properties to be flatten
+* @return Properties without defaults; all properties are put in the 
first-level
+*/
+   public static Properties flatten(Properties config) {

Review comment:
   > On a second thought, wouldn't it make more sense to provide a correct 
`propsToMap` implementation? If it's only used in KafkaProducer, then we could 
fix it there. If not, I'd consider that function more useful than this 
`flatten`.
   
   this name is still confusing since it is a Properties, not a map.
   
   The flattened properties are actually used in the Kafka client lib, not that 
easy to fix.
   

##
File path: flink-core/src/main/java/org/apache/flink/util/PropertiesUtil.java
##
@@ -108,6 +109,27 @@ public static boolean getBoolean(Properties config, String 
key, boolean defaultV
}
}
 
+   /**
+* Flatten a recursive {@link Properties} to a first level property map.
+* In some cases, {KafkaProducer#propsToMap} for example, Properties is 
used purely as a HashMap
+* without considering its default properties.
+*
+* @param config Properties to be flatten
+* @return Properties without defaults; all properties are put in the 
first-level
+*/
+   public static Properties flatten(Properties config) {

Review comment:
   > Should be covered with one test case.
   
   sure, will add one later.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org