1. 首页
  2. > 香港公司年审 >

流式处理集群是什么意思(pix4d集群处理)

这篇文章将介绍Flink的常见流式应用组件,介绍Flink的数据转换(Data Tr处理ansformation)和分区转换(Partitioning Transformation)操作。


一 从Hello World开始

我们从一个最简单的Hello World开始,体验一下DataStream API流式编程的应用。接下来我们详细介绍一下这些步骤。


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

同时你也可以像下面这样显式指定本地或远程执行环境:


// 创建一个本地的流式执行环境 LocalStreamEnvironment localEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1); // 创建一个远程的流式执行环境 StreamExecutionEnvironment remoteEnvironment = StreamExecutionEnvironment .createRemoteEnvironment("bigdata14", // JobManager的主机名 6123, // JobManager的端口号 "filepath//wordcount.jar"); // 需要传输到JobManager的JAR包

1.2 从数据源读取数据

在配置完执行环境后,我们就可以着手来做处理数据流的实质性工作。


1.2.1 从集合中读取数据

// 从集合读取数据 @Test public void sourceFromCollection() throws Exception { env.setParallelism(1); // 1.Source:从集合读取数据 DataStream<SensorReading> sensorDataStream = env.fromCollection(Arrays.asList( new SensorReading("sensor_158", 1622968229303L, 55.81297993454168), new SensorReading("sensor_77", 1622968229302L, 66.2215958248273), new SensorReading("sensor_59", 1622968229301L, 73.85444849219401) )); // 2.打印输出 sensorDataStream.print("data"); // 3.执行 env.execute(); }

1.2.2 从Element中读取数据

// 从Element中读取数据 @Test public void sourceElement() throws Exception { env.setParallelism(1); DataStream<integer> integerDataStream = env.fromElements(1, 3, 20, 45, 251); IntegerDataStream.print("int"); env.execute(); }

1.2.3从Kafka中读取数据

需要引入kafka连接器的依赖


<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.12</artifactId> <version>1.10.3</version> </dependency>

具体代码如下


private StreamExecutionEnvironment env = null; private Properties properties = new Prop是什么流式erties(); private static final String brokerList = "bigdata12:9092,bigdata13:9092,bigdata14:9092"; private static final String topicName = "topic-demo"; // 创建执行环境 @Before public void initEnvironment() { env = StreamExecutionEnvironment.getExecutionEnvironment(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group"); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); } // 从kafka中消费数据 @Test public void sourceFromKafka() throws Exception { DataStream<String> dataStream = env.addSource(new FlinkKafkaConsumer011<String>(topicName, new SimpleStringSchema(), properties)); // 打印输出 dataStream.print(); // 执行 env.execute(); }

1.2.4 自定义Source

除了以上的 source 数据来源,我们还可以自定义 source。需要做的,只是传入一个 RichParallelSourceFunction 就可以。具体调用如下:


DataStream<SensorReading> sensorStream = env.addSource(new SensorSource());

我们希望可以随机生成传感器数据, SensorSource 具体的代码实现如下:


public class SensorSource extends RichParallelSourceFunction<SensorReading> { // 标记是否在运行 private boolean running = true; public void run(SourceContext<SensorReading> ctx) throws Exception { DecimalFormat df = new DecimalFormat("#.0"); // 初始化随机生成器 Random random = new Random(); // 并行任务编号 int indexOfThisSubtask = this.getRuntimeContext().getIndexOfThisSubtask(); // 初始化 sensorIds 和 temperatures String[] sensorIds = new String[10]; double[] curFTemp = new double[10]; for (int i = 0; i < 10; i ) { sensorIds[i] = "sensor_" (indexOfThisSubtask * 10 i); curFTemp[i] = 65 (random.nextGaussian() * 20) ; } while (running) { // 获取当前时间 long curTimeInMillis = System.currentTimeMillis()/1000; // emit SensorReadings for (int i = 0; i < 10; i ) { // update current temperature 意思 curFTemp[i] = random.nextGaussian() * 0.5; // 收集SensorReading数据 ctx.collect(new SensorReading(sensorIds[i], curTimeInMillis, Double.parseDouble(df.format((curFTemp[i]-32)/1.8)))); } // sleep 100 ms Thread.sleep(100); } } // 取消调用 SourceFunction public void cancel() { this.running = false; } }

二转换操作(Transformations)

一旦得到了 DataStream pix4d对象,我们就可以对它应用转换。转换的类型多种多样:有些会生成一个新的 DataStream (类型可能会发生变化);而另外的一些不会修改 DataStream 中的记录,仅会通过分区或分组的方式将其重新组织。应用程序的逻辑是通过一系列转换来定义的。


2.1 基本转换

基本转换会单独处理每个事件,这意味着每条输出记录都由单条输入记录所生成。


2.1.1 MAP

通过调用 DataStream.map() 方法可以指定 map 转换产生一个新的DataStream。该转换将每个到来的事件传给一个用户自定义的映射器,后者针对每个输入只会返回一个(可能类型发生改变的)输出事件 。


图5-1 所示的 map 转换会将每个方形输入转换为圆形。


图5-1 map转换


MapFunction 的类型是根据输入和输出事件的类型来类型化的,可以使用MapFunction接口指定。 该接口定义了一个map()方法,该方法将输入事件转换为一个输出事件。


public interface MapFunction<T, O> extends Function, Serializable { O map(T value) throws Exception; }

下方是一个简单示例,把String类型的字符串转换成长度输出。


// 把String转换成长度输出 @Test public void mapFunction() throws Exception { DataStream<Integer> mapStream = dataStream.map(new MapFunction<SensorReading, Integer>() { public Integer map(SensorReading value) throws Exception { return value.toString().length(); } }); mapStream.print(); env.execute(); }

2.1.2FILTER

filter通过计算每个输入事件的布尔条件,来决定丢弃或转发流中的事件。 返回值为true会保留输入事件并将其转发到输出,而如果返回值为false则会导致事件被丢弃。 通过调用DataStream.filter()方法来指定对一个过滤器转换的使用。 图5-2 显示了一个仅保留白色方块的过滤操作。


图5-2 filter转换


布尔条件可以使用FilterFunction接口或lambda函数任一方式实现为一个UDF。 FilterFunction根据输入流的类型进行类型化,并定义filter()方法,该方法使用输入事件调用,并返回一个布尔值。


public interface FilterFunction<T> extends Function, Serializable { boolean filter(T value) throws Exception; }

下面例子所展示的filter是筛选sensor_1开头的id对应的数据:


// 筛选sensor_1开头的id对应的数据 @Test public void filterFunction() throws Exception { DataStream<SensorReading> filterStream = dataStream.filter(new FilterFunction<SensorReading>() { public boolean filter(SensorReading value) throws Exception { return value.toString().startsWith("sensor_1"); } }); filterStream.print(); env.execute(); }

2.1.3FLATMAP

flatMap 转换类似于 map ,但它可以对每个输入事件产生零个、 一个或多个输出事件。事实上,flatMap转换可以看做是 fiIter 和 map 的泛化,它能够实现后两者的操作。 图5-3 展示了一个flatMap操作,会根据输入事件颜色的不同输出不同的结果。具体而言,它会将白色方块不将改动直接输出,将黑色方块复制,将灰色方块丢弃。


图5-3 flatMap


flatMap转换对每个传入事件应用UDF。对应的FlatMapFunction定义了一个flatMap()方法,该方法可以通过将它们传递给Collector对象来返回零个,一个或多个事件。


public interface FlatMapFunction<T, O> extends Function, Serializable { void flatMap(T value, Collector<O> out) throws Exception; }

flatmap,按逗号分隔字段


// flatmap,按逗号分隔字段 @Test public void flatMapFunction() throws Exception { DataStream<String> flatMapStream = dataStream.flatMap(new FlatMapFunction<SensorReading, String>() { public void flatMap(SensorReading value, Collector<String> out) throws Exception { String[] fields = value.toString().split(","); for (String field : fields) { out.collect(field); } } }); flatMapStream.print(); env.execute(); }

2.2 基于KeyedStream的转换

很多应用需要将事件按照某个属性分组后再进行处理。KeyedStream可以从逻辑上将事件按照键值分配到多条独立的子流中。作用于KeyedStream的状态化转换,可以对当前处理事件的键值所对应上下文中的状态进行读写。这意味着所有键值相同的事件可以访问相同的状态,因此它们可以被一并处理。


下面我们将使用 keyBy 转换将一个 DataStream 转化为 KeyedStream,然后对它进行滚动聚合以及Reduce操作。


2.2.1KEYBY

keyBy 转换通过指定键值的方式将一个 DataStream 转化为 KeyedStream。事件会按照键值被分到不同的分区,相同键值的事
件一定会在后续算子的同一个任务上处理。虽然键值不同的事件也可能会在同一个任务上处理,但任务函数所能访问的键值分区状态,始终会被约束在当前事件键值的范围内。


假设以输入事件的颜色作为键值,图5-4 中将所有黑色事件分到一个任务,而将其他事件分到另一个任务 。


图5-4 keyBy


DataStream → KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素,在内部以 hash 的形式实现的。


以下代码针对 SensorReading 记录流将其中的 id 字段声明为键值:


DataStream<String> inputStream = env.readTextFile("sensor.txt"); // 使用mapfunction转换 DataStream<SensorReading> dataStream = inputStream.map(new MapFunction<String, SensorReading>() { public SensorReading map(String value) throws Exception { String[] fields = value.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); } }); // 分组 KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");

2.2.2 滚动聚合(ROLLING AGGREGATIONS)

滚动聚合转换作用于 KeyedStream 上,它将生成一个包含聚合结果(例如求和、最小值、最大值等)的 DataStream。该动聚合算子会对每一个遇到过的键值保存一个聚合结果。每当有新事件到来,该算子都会更新相应的聚合结果,并将其以事件的形式发送出去。DataStream API 中提供了以下滚动聚合方法:


2.2.2.1 sum()

滚动计算输入流中指定字段的和。


2.2.2.2 min()

滚动计算输入流中指定字段的最小值。


2.2.2.3 max()

滚动计算输入流中指定字段的最大值。


2.2.2.4 minBy()

滚动计算输入流中迄今为止最小值,返回该值所在的事件。


2.2.2.5 maxBy()

滚动计算输入流中迄今为止最大值,返回该值所在的事件。


无法将多个滚动聚合方法组合使用,每次只能计算一个。


续keyBy的例子:


// 分组 KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id"); DataStream<SensorReading> maxTemperature = keyedStream.max("temperature"); DataStream<SensorReading> maxByTemperature = keyedStream.maxBy("temperature"); DataStream<SensorReading> sumTemperature = keyedStream.sum("temperature"); maxTemperature.print("maxTemperature"); maxByTemperature.print("maxByTemperature"); sumTemperature.print("sumTemperature");

2.2.2.6只对有限键值域使用滚动聚合

滚动聚合会为每个处理过的键值维持一个状态。由于这些状态不会被自动清理,所以该算子只能用于有限流的计算。


2.2.3 Reduce

Reduce转换是滚动聚合转换的泛化。它将一个ReduceFunction应用在一个KeyedStream上, 每个到来的事件都会和Reduce结果进行一次组合,从而产生一个新的DataStream。Reduce转换不会改变数据类型,因此输出流的类型会永远和输入流保持一致。处理


可以使用实现了ReduceFunction接口的类来指定UDF。 ReduceFunction定义了reduce()方法,该方法接受两个输入事件并返回相同类型的事件。


KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。


样例:reduce转换,取当前时间,每个传感器最高的温度值。


// 分组 KeyedStream<SensorReading, Tupix4dple> keyedStream = dataStream.keyBy("id"); // reduce转换,取当前时间,每个传感器最高的温度值 DataStream<SensorReading> reduceMaxTemperature = keyedStream.reduce(new ReduceFunction<SensorReading>() { @Override public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception { return new SensorReading(value1.getId(), value2.getTimestamp(), Math.max(value1.getTemperature(), value2.getTemperature())); } }); reduceMaxTemperature.print(); env.execute();

只对有限键值域使用Reduce操作

Reduce会为每个处理过的键值维持一个状态。由于这些状态不会被自动清理,所以该算子只能用于有限流的计算。


2.3多流转换

很多应用需要将多条输入流联合起来处理,或将一条流分割成多条子流以应用不同逻辑。接下来我们将讨论那些能同时处理多条输入流或产生多条结果流的 DataStream API 转换。


2.3.1 UNION

DataStream.union()方法可以合并两条或多条类型相同的DataStream,生成一个新的类型相同的DataStream。这样后续的转换操作就可以对所有输入流中的元素统一处理。图5-5 展示的union操作会将黑色和白色的事件合并成一条输出流。


图5-5 UNION


union执行过程中,来自两条流的事件会以FIFO(先进先出)的方式合并,其顺序无法得到任何保证。此外,union算子不会对数据进行去重,每个输入消息都会被发往下游算子。


DataStream<SensorReading> unionStream =集群 highTempStream.union(lowTempStream);

2.3.2 Connect、CoMap和COFLATMAP

2.3.2.1 Connect

DataStream.connect()方法接收一个DataStream并返回一个ConnectedStream对象,该对象表示两个联结起来(connected)的流。DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。



2.3.2.2 CoMap,CoFlatMap

ConnectedStreams对象提供了map()和flatMap()方法 ,它们分别接收一个CoMapFunction 和一个CoFlatMapFunction作为参数。


两个函数都是以两条输入流的类型外加输出流的类型作为其类型参数,它们为两条输入流定义了各自的处理方法。map1()和 flatMap1()用来处理第一条输入流的事件,map2()和flatMap2()用来处理第二条输入流的事件:


// 合流 connect 处理 @Test public void connectStream() throws Exception { DataStream<SensorReading> sensorReading = env.addSource(new SensorSource()); // 火险等级 DataStream<SmokeLevel> smokeLevel = env.addSource(new SmokeLevelSource()); // 按照sensor.id进行分组 KeyedStream<SensorReading, Tuple> keyedSensorReadingStream = sensorReading.keyBy("id"); DataStream<Alert> alertDataStream = keyedSensorReadingStream.connect(smokeLevel.broadcast()).flatMap(new CoFlatMapFunction<SensorReading, SmokeLevel, Alert>() { private SmokeLevel smokeLevel = SmokeLevel.LOW; @Override public void flatMap1(SensorReading value, Collector<Alert> out) { if (this.smokeLevel == SmokeLevel.HIGH && value.getTemperature() > 100) { out.collect(new Alert("Risk of fire! " value, (long) value.getTemperature())); } else { out.collect(new Alert("normal! " value, (long) value.getTemperature())); } } @Override public void flatMap2(SmokeLevel value, Collector<Alert> out) { // 更新火险等级 this.smokeLevel = value; } }); alertDataStream.print(); env.execute(); }

2.3.3.3函数无法选择从哪条流读取数据

CoMapFunction函数无法选择从哪条流读取数据,同时也无法控制CoFlatMapFunction内方法的调用顺序。一旦对应流中有事件到来,系统就需要调用相应的方法。


2.3.3 Connect 与 Union 区别

  1. Union之前两个流的类型必须是一样,Connect可以不一样,在之后的coMap中再去调整成为一样的。
  2. Connect 只能操作两个流,Union可以操作多个。

2.4 分发转换(Distribution Transformations)

如果DataStream的并行分区存在倾斜现象,那么可能就希望通过重新平衡数据来均匀分配后续算子的负载。我们接下来介绍用于控制分区策略或自定义分区策略的方法。


2.4.1 随机(Random)

可以利用DataStream.suffle()方法实现随机数据交换策略。这个方法会把数据均匀并且随机地发送给后续的并行算子。


2.4.2轮询(Round-Robin)

rebalance()方法会将输入流中的事件以轮询方式均匀分配给后继任务,如图5-7所示。


2.4.3 重调(Rescale)

Rescale也会以轮流方式对事件进行分发,但分发目标仅限于部分后继任务。重调分区策略为发送端和接收端任务不等的情况提供了一种轻量级的负载均衡方法。当接收端任务远大于发送端任务的时候,这个方法会更有效,反之亦然。


rebalance()会在所有发送任务和接收任务之间建立通信通道;而rescale()中每个发送任务只会和下游算子的部分任务建立通道。 图5-7 展示了重调分发转换的连接模式。


图5-7 rebalance和rescale


2.4.4 广播(Broadcast)

Broadcast方法会将输入流中的事件复制并发往所有下游算子的并行任务。


三 设置并行度

当提交一个DataStream程序到JobManager上执行时,系统会生成一个Dataflow图并准备好用于执行的算子。每个算子都会产生一个或多个并行任务。每个任务负责处理算子的部分输入流。算子并行化任务的数目称为该算子的并行度。它决定了算子处理的并行化程度以及能够处理的数据规模。


最好将算子并行度设置为随环境默认并行度变化的值。这样就可以通过提交客户端来轻易调整并行度,从而实现应用的扩缩容。你可以按照下面的示例来访问环境的默认并行度:


// 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 获取默认并行度 int parallelism = env.getParallelism();

也可以覆盖环境的默认并行度意思,设置之后,将无法通过提交客户端控制应用并行度:


// 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置环境并行度 env.setParallelism(32);

也可以通过显式指定的方式来覆盖算子的默认并行度。下面的示例中,数据源算子会以环境默认并行度执行, map转换的任务数是数据源的两倍,sink操作固定以两个并行任务执行:


// 获取默认并行度 int parallelism = env.getParallelism(); // 数据源以默认并行度执行 env.addSource(new SensorSource()) // 设置map的并行度为默认并行度的2倍 .map(new MapFunction<SensorReading, Long>() { @Override public Long map(SensorReading value) throws Exception { return value.getTimestamp(); } }).setParallelism(parallelism*2) // print并行度设置为2 .print().setParallelism(2);

四 支持的数据类型

Flink 流应用程序处理的是以数据对象表示的事件流。 所以在 Flink 内部, 我们需要能够处理这些对象。它们需要被序列化和反序列化,以便通过网络传送它们;或者从状态后端、检查点和保存点读取它们。为了有效地做到这一点, Flink 需要明确知道应用程序所处理的数据类型。 Flink 使用类型信息的概念来表示数据类型,并为每个数据类型生成特定的序列化器、反序列化器和比较器。


Flink 还具有一个类型提取系统,该系统分析函数的输入和返回类型,以自动获取类型信息,从而获得序列化器和反序列化器。但是,在某些情况下,例如 lambda函数或泛型类型,需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。


Flink 支持 Java 和 Scala 所有常见数据类型。使用最广泛的类型有以下几种。


4.1基础数据类型

Flink 支持所有的 Java 和 Scala 基础数据类型, Int, Double, Long, String, …


DataStream<Integer> numberStream = env.fromElements(1, 2, 3, 4); numberStream.map(data -> data * 2);

4.2Java 和 Scala 元组(Tuples)

DataStream<Tuple2<String, Integer>> personStream = env.fromElements( new Tuple2("Adam", 17), new Tuple2("Sarah", 23) ); personStream.filter(p -> p.f1 > 18);

4.3 Scala 样例类(case classes)

case class Person(name: String, age: Int) val persons: DataStream[Person] = env.fromElements( Person("Adam", 17), Person("Sarah", 23) ) persons.filter(p => p.age > 18)

4.4Java 简单对象(POJOs)

// 传感器温度读数的数据类型 public class SensorReading { // 属性:id,时间戳,温度值 private String id; private long timestamp; private double temperature; public SensorReading() { } }

4.5 其它(Arrays, Lists, Maps, Enums, 等等)

Flink 对 Java 和 Scala 中的一些特殊目的的类型也都是支持的,比如 Java 的ArrayList, HashMap, Enum 等等。


五实现函数

到目前为止,你已经在本章的代码示例中见到过如f可使用用户自定义函数。我们会对在 DataStream API 中定义和参数化函数的各种方式进行一个更加详细的解释。

是什么

5.1 函数类(Function Classes)

Flink 暴露了所有UDF函数的接口(实现方式为接口或者抽象类)。例如MapFunction, FilterFunction, ProcessFunction 等等。
下面的例子实现了 FilterFunction 接口:


DataStream<String> flinkTweets = tweets.filter(new FlinkFilter()); public static class FlinkFilter implements FilterFunction<String> { @Override public boolean filter(String value) throws Exception { return value.contains("flink"); } }

还可以将函数实现成匿名类


DataStream<String> flinkTweets = tweets.filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { return value.cont集群ains("flink"); } });

我们 filter 的字符串"flink"还可以当作参数传进去


DataStream<String> tweets = env.readTextFile("INPUT_FILE"); DataStream<String> flinkTweets = tweets.filter(new KeyWordFilter("flink")); public static class KeyWordFilter implements FilterFunction<String> { private String keyWord; KeyWordFilter(String keyWord) { this.keyWord = keyWord; } @Override public boolean filter(String value) throws Exception { return value.contains(this.keyWord); } }

5.2 匿名函数(Lambda Functions)

DataStream<String> tweets = env.readTextFile("INPUT_FILE"); DataStream<String> flinkTweets = tweets.filter( tweet -> tweet.contains("flink") );

5.3 富函数(Rich Functions)

“富函数”是 DataStream API 提供的一个函数类的接口, 所有 Flink 函数类都有其 Rich 版本。 它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。


  • RichMapFunction
  • RichFlatMapFunction
  • RichFilterFunction

    Rich 流式Function 有一个生命周期的概念。 典型的生命周期方法有:
  • open()方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter被调用之前 open()会被调用。
  • close()方法是生命周期中的最后一个调用的方法,做一些清理工作。
  • getRuntimeContext()方法提供了函数的 RuntimeContext 的一些信息,例如函数执行的并行度,任务的名字,以及 state 状态。

public static class MyMapFunction extends RichMapFunction<SensorReading,Tuple2<Integer, String>> { @Override public Tuple2<Integer, String> map(SensorReading value) throws Exception { return new Tuple2<>(getRuntimeContext().getIndexOfThisSubtask(),value.getId()); } @Override public void open(Configuration parameters) throws Exception { System.out.println("my map open"); // 以下可以做一些初始化工作,例如建立一个和 HDFS 的连接 } @Override public void close() throws Exception { System.out.println("my map close"); // 以下做一些清理工作,例如断开和 HDFS 的连接 } }


版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至123456@qq.com 举报,一经查实,本站将立刻删除。

联系我们

工作日:9:30-18:30,节假日休息