Repository: flink
Updated Branches:
  refs/heads/release-1.5 5621a1e9f -> 3df5c6685


[FLINK-8980] [e2e] Add a BucketingSink end-to-end test

This closes #5813.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3df5c668
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3df5c668
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3df5c668

Branch: refs/heads/release-1.5
Commit: 3df5c6685091cea6c50758769c37ad3d6390f94a
Parents: 5621a1e
Author: Timo Walther <twal...@apache.org>
Authored: Tue Mar 27 17:44:58 2018 +0200
Committer: Timo Walther <twal...@apache.org>
Committed: Thu Apr 19 15:12:08 2018 +0200

----------------------------------------------------------------------
 .../flink-bucketing-sink-test/pom.xml           | 112 ++++++++++++
 .../tests/BucketingSinkTestProgram.java         | 181 +++++++++++++++++++
 .../flink-dataset-allround-test/pom.xml         |   2 +
 .../pom.xml                                     |  25 +--
 .../flink/sql/tests/StreamSQLTestProgram.java   |   7 +
 flink-end-to-end-tests/pom.xml                  |   1 +
 flink-end-to-end-tests/test-scripts/common.sh   |   6 +
 .../test-scripts/test_streaming_bucketing.sh    | 111 ++++++++++++
 8 files changed, 423 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3df5c668/flink-end-to-end-tests/flink-bucketing-sink-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-bucketing-sink-test/pom.xml 
b/flink-end-to-end-tests/flink-bucketing-sink-test/pom.xml
new file mode 100644
index 0000000..3967ae7
--- /dev/null
+++ b/flink-end-to-end-tests/flink-bucketing-sink-test/pom.xml
@@ -0,0 +1,112 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+    -->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-end-to-end-tests</artifactId>
+               <version>1.6-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <artifactId>flink-bucketing-sink-test</artifactId>
+       <name>flink-bucketing-sink-test</name>
+       <packaging>jar</packaging>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-connector-filesystem_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-shaded-hadoop2</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+                       <exclusions>
+                               <!-- Needed for proper dependency convergence 
-->
+                               <exclusion>
+                                       <groupId>commons-beanutils</groupId>
+                                       
<artifactId>commons-beanutils</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <version>3.0.0</version>
+                               <executions>
+                                       <execution>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>shade</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<finalName>BucketingSinkTestProgram</finalName>
+                                                       <artifactSet>
+                                                               <excludes>
+                                                                       
<exclude>com.google.code.findbugs:jsr305</exclude>
+                                                               </excludes>
+                                                       </artifactSet>
+                                                       <filters>
+                                                               <filter>
+                                                                       
<artifact>*:*</artifact>
+                                                                       
<excludes>
+                                                                               
<exclude>META-INF/*.SF</exclude>
+                                                                               
<exclude>META-INF/*.DSA</exclude>
+                                                                               
<exclude>META-INF/*.RSA</exclude>
+                                                                       
</excludes>
+                                                               </filter>
+                                                       </filters>
+                                                       <transformers>
+                                                               <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                                                       
<mainClass>org.apache.flink.streaming.tests.BucketingSinkTestProgram</mainClass>
+                                                               </transformer>
+                                                       </transformers>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+
+</project>
+

http://git-wip-us.apache.org/repos/asf/flink/blob/3df5c668/flink-end-to-end-tests/flink-bucketing-sink-test/src/main/java/org/apache/flink/streaming/tests/BucketingSinkTestProgram.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-bucketing-sink-test/src/main/java/org/apache/flink/streaming/tests/BucketingSinkTestProgram.java
 
b/flink-end-to-end-tests/flink-bucketing-sink-test/src/main/java/org/apache/flink/streaming/tests/BucketingSinkTestProgram.java
new file mode 100644
index 0000000..26d7e91
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-bucketing-sink-test/src/main/java/org/apache/flink/streaming/tests/BucketingSinkTestProgram.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.fs.Clock;
+import org.apache.flink.streaming.connectors.fs.bucketing.Bucketer;
+import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
+
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * End-to-end test for the bucketing sink.
+ *
+ * <p>Contains a simple stateful job that emits into buckets per key.
+ *
+ * <p>The stream is bounded and will complete after about a minute.
+ * The result is always constant.
+ *
+ * <p>Parameters:
+ * -outputPath Sets the path to where the result data is written.
+ */
+public class BucketingSinkTestProgram {
+
+       public static void main(String[] args) throws Exception {
+
+               ParameterTool params = ParameterTool.fromArgs(args);
+               String outputPath = params.getRequired("outputPath");
+
+               StreamExecutionEnvironment sEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               sEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(
+                               3,
+                               Time.of(10, TimeUnit.SECONDS)
+                       ));
+               sEnv.enableCheckpointing(4000);
+
+               final int idlenessMs = 10;
+
+               // define bucketing sink to emit the result
+               BucketingSink<Tuple4<Integer, Long, Integer, String>> sink = 
new BucketingSink<Tuple4<Integer, Long, Integer, String>>(outputPath)
+                       .setBucketer(new KeyBucketer());
+
+               // generate data, shuffle, perform stateful operation, sink
+               sEnv.addSource(new Generator(10, idlenessMs, 60))
+                       .keyBy(0)
+                       .map(new SubtractingMapper(-1L * idlenessMs))
+                       .addSink(sink);
+
+               sEnv.execute();
+       }
+
+       /**
+        * Use first field for buckets.
+        */
+       public static class KeyBucketer implements Bucketer<Tuple4<Integer, 
Long, Integer, String>> {
+
+               @Override
+               public Path getBucketPath(Clock clock, Path basePath, 
Tuple4<Integer, Long, Integer, String> element) {
+                       return basePath.suffix(String.valueOf(element.f0));
+               }
+       }
+
+       /**
+        * Subtracts the timestamp of the previous element from the current 
element.
+        */
+       public static class SubtractingMapper extends 
RichMapFunction<Tuple3<Integer, Long, String>, Tuple4<Integer, Long, Integer, 
String>> {
+
+               private final long initialValue;
+
+               private ValueState<Integer> counter;
+               private ValueState<Long> last;
+
+               public SubtractingMapper(long initialValue) {
+                       this.initialValue = initialValue;
+               }
+
+               @Override
+               public void open(Configuration parameters) {
+                       counter = getRuntimeContext().getState(new 
ValueStateDescriptor<>("counter", Types.INT));
+                       last = getRuntimeContext().getState(new 
ValueStateDescriptor<>("last", Types.LONG));
+               }
+
+               @Override
+               public Tuple4<Integer, Long, Integer, String> 
map(Tuple3<Integer, Long, String> value) throws IOException {
+                       // update counter
+                       Integer counterValue = counter.value();
+                       if (counterValue == null) {
+                               counterValue = 0;
+                       }
+                       counter.update(counterValue + 1);
+
+                       // save last value
+                       Long lastValue = last.value();
+                       if (lastValue == null) {
+                               lastValue = initialValue;
+                       }
+                       last.update(value.f1);
+
+                       return Tuple4.of(value.f0, value.f1 - lastValue, 
counterValue, value.f2);
+               }
+       }
+
+       /**
+        * Data-generating source function.
+        */
+       public static class Generator implements SourceFunction<Tuple3<Integer, 
Long, String>>, ListCheckpointed<Long> {
+
+               private final int numKeys;
+               private final int idlenessMs;
+               private final int durationMs;
+
+               private long ms = 0;
+
+               public Generator(int numKeys, int idlenessMs, int 
durationSeconds) {
+                       this.numKeys = numKeys;
+                       this.idlenessMs = idlenessMs;
+                       this.durationMs = durationSeconds * 1000;
+               }
+
+               @Override
+               public void run(SourceContext<Tuple3<Integer, Long, String>> 
ctx) throws Exception {
+                       while (ms < durationMs) {
+                               synchronized (ctx.getCheckpointLock()) {
+                                       for (int i = 0; i < numKeys; i++) {
+                                               ctx.collect(Tuple3.of(i, ms, 
"Some payload..."));
+                                       }
+                                       ms += idlenessMs;
+                               }
+                               Thread.sleep(idlenessMs);
+                       }
+               }
+
+               @Override
+               public void cancel() { }
+
+               @Override
+               public List<Long> snapshotState(long checkpointId, long 
timestamp) {
+                       return Collections.singletonList(ms);
+               }
+
+               @Override
+               public void restoreState(List<Long> state) {
+                       for (Long l : state) {
+                               ms += l;
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3df5c668/flink-end-to-end-tests/flink-dataset-allround-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-dataset-allround-test/pom.xml 
b/flink-end-to-end-tests/flink-dataset-allround-test/pom.xml
index b247743..2592117 100644
--- a/flink-end-to-end-tests/flink-dataset-allround-test/pom.xml
+++ b/flink-end-to-end-tests/flink-dataset-allround-test/pom.xml
@@ -38,11 +38,13 @@ under the License.
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-core</artifactId>
                        <version>${project.version}</version>
+                       <scope>provided</scope>
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-java</artifactId>
                        <version>${project.version}</version>
+                       <scope>provided</scope>
                </dependency>
        </dependencies>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3df5c668/flink-end-to-end-tests/flink-parent-child-classloading-test/pom.xml
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-parent-child-classloading-test/pom.xml 
b/flink-end-to-end-tests/flink-parent-child-classloading-test/pom.xml
index a7d09c0..f131d29 100644
--- a/flink-end-to-end-tests/flink-parent-child-classloading-test/pom.xml
+++ b/flink-end-to-end-tests/flink-parent-child-classloading-test/pom.xml
@@ -39,11 +39,13 @@
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-core</artifactId>
                        <version>${project.version}</version>
+                       <scope>provided</scope>
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
+                       <scope>provided</scope>
                </dependency>
        </dependencies>
 
@@ -63,7 +65,7 @@
                                                        <goal>jar</goal>
                                                </goals>
                                                <configuration>
-                                                       
<classifier>ClassLoaderTestProgram</classifier>
+                                                       
<finalName>ClassLoaderTestProgram</finalName>
 
                                                        <archive>
                                                                
<manifestEntries>
@@ -80,27 +82,6 @@
                                        </execution>
                                </executions>
                        </plugin>
-
-                       <!--simplify the name of the testing JARs for referring 
to them in the end-to-end test scripts-->
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-antrun-plugin</artifactId>
-                               <version>1.7</version>
-                               <executions>
-                                       <execution>
-                                               <id>rename</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>run</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <target>
-                                                               <copy 
file="${project.basedir}/target/flink-parent-child-classloading-test_${scala.binary.version}-${project.version}-ClassLoaderTestProgram.jar"
 tofile="${project.basedir}/target/ClassLoaderTestProgram.jar" />
-                                                       </target>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
                </plugins>
        </build>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3df5c668/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java
 
b/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java
index e9e79ac..bc07d61 100644
--- 
a/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java
+++ 
b/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java
@@ -19,6 +19,8 @@
 package org.apache.flink.sql.tests;
 
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
@@ -46,6 +48,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 /**
  * End-to-end test for Stream SQL queries.
@@ -71,6 +74,10 @@ public class StreamSQLTestProgram {
                String outputPath = params.getRequired("outputPath");
 
                StreamExecutionEnvironment sEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               sEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(
+                       3,
+                       Time.of(10, TimeUnit.SECONDS)
+               ));
                sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                sEnv.enableCheckpointing(4000);
                sEnv.getConfig().setAutoWatermarkInterval(1000);

http://git-wip-us.apache.org/repos/asf/flink/blob/3df5c668/flink-end-to-end-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 26f192a..641a83b 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -38,6 +38,7 @@ under the License.
                <module>flink-parent-child-classloading-test</module>
                <module>flink-dataset-allround-test</module>
                <module>flink-stream-sql-test</module>
+               <module>flink-bucketing-sink-test</module>
        </modules>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/3df5c668/flink-end-to-end-tests/test-scripts/common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/common.sh 
b/flink-end-to-end-tests/test-scripts/common.sh
index fd592ff..73885e1 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -301,6 +301,12 @@ function s3_delete {
     https://${bucket}.s3.amazonaws.com/${s3_file}
 }
 
+function kill_random_taskmanager {
+  KILL_TM=$(jps | grep "TaskManager" | sort -R | head -n 1 | awk '{print $1}')
+  kill -9 "$KILL_TM"
+  echo "TaskManager $KILL_TM killed."
+}
+
 # make sure to clean up even in case of failures
 function cleanup {
   stop_cluster

http://git-wip-us.apache.org/repos/asf/flink/blob/3df5c668/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh 
b/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh
new file mode 100755
index 0000000..b4c3ce9
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh
@@ -0,0 +1,111 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-bucketing-sink-test/target/BucketingSinkTestProgram.jar
+
+# enable DEBUG logging level to retrieve truncate length later
+sed -i -e 
's/#log4j.logger.org.apache.flink=INFO/log4j.logger.org.apache.flink=DEBUG/g' 
$FLINK_DIR/conf/log4j.properties
+
+start_cluster
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+
+function bucketing_cleanup() {
+
+  stop_cluster
+  $FLINK_DIR/bin/taskmanager.sh stop-all
+
+  # restore default logging level
+  sed -i -e 
's/log4j.logger.org.apache.flink=DEBUG/#log4j.logger.org.apache.flink=INFO/g' 
$FLINK_DIR/conf/log4j.properties
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+trap bucketing_cleanup INT
+trap bucketing_cleanup EXIT
+
+JOB_ID=$($FLINK_DIR/bin/flink run -d -p 4 $TEST_PROGRAM_JAR -outputPath 
$TEST_DATA_DIR/out/result \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running ${JOB_ID}
+
+sleep 40
+
+echo "Killing TM"
+
+# kill task manager
+kill_random_taskmanager
+
+echo "Starting TM"
+
+# start task manager again
+$FLINK_DIR/bin/taskmanager.sh start
+
+echo "Killing 2 TMs"
+
+# kill two task managers again shortly after
+kill_random_taskmanager
+kill_random_taskmanager
+
+echo "Starting 2 TMs and waiting for successful completion"
+
+# start task manager again and let job finish
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+
+# the job should complete in under 60s because half of the work has been 
checkpointed
+sleep 100
+
+# get truncate information
+# e.g. "xxx xxx DEBUG xxx.BucketingSink  - Writing valid-length file for 
xxx/out/result8/part-0-0 to specify valid length 74994"
+LOG_LINES=$(grep -rnw $FLINK_DIR/log -e 'Writing valid-length file')
+
+# perform truncate on every line
+echo "Truncating buckets"
+while read -r LOG_LINE; do
+  PART=$(echo "$LOG_LINE" | awk '{ print $10 }' FS=" ")
+  LENGTH=$(echo "$LOG_LINE" | awk '{ print $15 }' FS=" ")
+
+  echo "Truncating $PART to $LENGTH"
+
+  dd if=$PART of="$PART.truncated" bs=$LENGTH count=1
+  rm $PART
+  mv "$PART.truncated" $PART
+done <<< "$LOG_LINES"
+
+# get all lines in pending or part files
+find ${TEST_DATA_DIR}/out -type f \( -iname "*.pending" -or -iname "part-*" \) 
-exec cat {} + > ${TEST_DATA_DIR}/complete_result
+
+# for debugging purposes
+#echo "Checking proper result..."
+#for KEY in {0..9}; do
+#  for IDX in {0..5999}; do
+#    FOUND_LINES=$(grep "($KEY,10,$IDX,Some payload...)" 
${TEST_DATA_DIR}/complete_result | wc -l)
+#    if [ ${FOUND_LINES} != 1 ] ; then
+#      echo "Unexpected count $FOUND_LINES for ($KEY,10,$IDX,Some payload...)"
+#      PASS=""
+#      exit 1
+#    fi
+#  done
+#done
+
+check_result_hash "Bucketing Sink" $TEST_DATA_DIR/complete_result 
"01aba5ff77a0ef5e5cf6a727c248bdc3"

Reply via email to