[新乡化纤厂]炒股软件如何把分笔数据导出(炒股软件如何分屏)

之前的文章《万字长文深度解析WordCount,入门Flink,看这一篇就够了》运用WordCount展现了Flink程序的根本结构,本文将以股票价格事例来演示怎么运用Flink的DataStreamapi。

通过本文,你能够学到:

界说相关数据结构。Flink流处理程序的骨架。Flink的履行环境概念。自界说Source、设置时刻戳和Watermark。数据结构

Flink能处理任何可被序列化的数据结构:

根底数据类型,包含String、Integer、Boolean、Array杂乱数据结构,包含Scalacaseclass和JavaPOJO

此外,Flink也支撑Kryo序列化东西。

本例运用Scalacaseclass来界说一个股票类,该目标包含三个字段:股票代号、时刻戳和价格。实在的股票交易数据比这个更为杂乱,这儿仅仅一个简化的模型。

caseclassStockPrice(symbol:String,timestamp:Long,price:Double)

当然,假设运用Java,也能够界说一个POJO(PlainOldJavaObject),该类中各个字段或许具有public特点,或许有一个对应的getter和setter办法,且该类有一个无参数的结构函数。

publicclassStockPrice{\npublicStringsymbol;\npublicLongtimestamp;\npublicDoubleprice;\n\npublicStockPrice(){};\npublicStockPrice(Stringsymbol,Longtimestamp,Doubleprice){\n...\n};\n}

比较而言,Scala的类界说更为简练,由于Scala的编译器在编译阶段帮助生成了不少代码,Java的代码风格有些臃肿。

Flink对数据类型有以上要求,首要由于在分布式核算过程中,需求将内存中的目标序列化成可多节点传输的数据,而且能够在对应节点被反序列化成目标。

Flink流处理程序的骨架结构

依据上面的数据结构,咱们开端开发程序。下面的代码清单运用Flink对股票数据流剖析程序,该程序能够核算数据源中每支股票5秒时刻窗口内的最大值。

objectStockPriceDemo{\n\ndefmain(args:Array[String]){\n\n//设置履行环境\nvalenv=StreamExecutionEnvironment.getExecutionEnvironment\n\nenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)\n//每5秒生成一个Watermark\nenv.getConfig.setAutoWatermarkInterval(5000L)\n\n//股票价格数据流\nvalstockPriceRawStream:DataStream[StockPrice]=env\n//该数据流由StockPriceSource类随机生成\n.addSource(newStockPriceSource)\n//设置Timestamp和Watermark\n.assignTimestampsAndWatermarks(newStockPriceTimeAssigner)\n\nvalstockPriceStream:DataStream[StockPrice]=stockPriceRawStream\n.keyBy(_.symbol)\n//设置5秒的时刻窗口\n.window(TumblingEventTimeWindows.of(Time.seconds(5)))\n//取5秒内某一只股票的最大值\n.max("price")\n\n//打印成果\nstockPriceStream.print()\n\n//履行程序\nenv.execute("Computemaxstockprice")\n}\n}

Java或Scala的程序进口一般是一个静态(static)的main函数。而在Scala中,object下的变量和办法都是静态的。在main函数中,还需求界说下面几个过程:

设置运转环境。读取一到多个数据源。依据事务逻辑对数据流进行Transformation操作。将成果输出。调用作业履行函数execute。

接下来咱们对这五个过程拆解剖析。

设置履行环境

valenv=StreamExecutionEnvironment.getExecutionEnvironment

这行代码能够获取一个Flink流处理履行环境。Flink一般运转在一个集群上,履行环境是Flink程序运转的上下文,它供给了一系列作业与集群交互的办法,比方作业怎么与外部国际交互。当调用getExecutionEnvironment办法时,假设咱们是在一个集群上提交作业,则回来集群的上下文,假设咱们是在本地履行,则回来本地的上下文。本例中咱们是进行流处理,在批处理场景则要获取DataSetAPI中批处理履行环境。

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)这行代码奉告履行环境运用Event-time时刻语义来进行后续时刻上的核算。Event-time语义下需求依靠Watermark机制,即收到一个Watermark后,开端对这个窗口进行核算,比Watermark更晚抵达的事情都被视为推迟数据。env.getConfig.setAutoWatermarkInterval(5000L)设置每5秒生成一个Watermark,默许情况下每200毫秒生成一个Watermark。

此外,咱们还能够设置作业的并行度、装备Checkpoint等操作。可见,履行环境是咱们与Flink交互的进口。

读取数据源

接着咱们需求运用履行环境供给的办法读取数据源,读取数据源的部分统称为Source。数据源一般是音讯行列或文件,咱们也能够依据事务需求重写数据源,比方守时爬取网络中某处的数据。在本例中,咱们运用valstockPriceRawStream:DataStream[StockPrice]=env.addSource(newStockPriceSource)来读取数据源,其间StockPriceSource随机生成了一些股票价格数据。终究生成的stockPriceRawStream是一个由StockPrice组成的DataStream数据流。

下面的代码清单展现了StockPriceSource类承继RichSourceFunction,对run办法重写,不断随机生成股票价格,生成的数据终究写入SourceContext中。

classStockPriceSourceextendsRichSourceFunction[StockPrice]{\n\nvarisRunning:Boolean=true\n\nvalrand=newRandom()\n//初始化股票价格\nvarpriceList:List[Double]=List(100.0d,200.0d,300.0d,400.0d,500.0d)\nvarstockId=0\nvarcurPrice=0.0d\n\noverridedefrun(srcCtx:SourceContext[StockPrice]):Unit={\n\nwhile(isRunning){\n//每次从列表中随机挑选一只股票\nstockId=rand.nextInt(priceList.size)\n\nvalcurPrice=priceList(stockId)+rand.nextGaussian()*0.05\npriceList=priceList.updated(stockId,curPrice)\nvalcurTime=Calendar.getInstance.getTimeInMillis\n\n//将数据源搜集写入SourceContext\nsrcCtx.collect(StockPrice("symbol_"+stockId.toString,curTime,curPrice))\nThread.sleep(rand.nextInt(10))\n}\n}\n\noverridedefcancel():Unit={\nisRunning=false\n}\n}

虽然StockPrice的数据结构中有时刻戳的字段,可是Flink并不知道哪个字段是时刻戳,所以还需求手动设置。assignTimestampsAndWatermarks(newStockPriceTimeAssigner)办法答应咱们设置时刻戳和Watermark,这样Flink就能够知道本程序的时刻戳。FlinkWatermark相关的内容将在后续文章中介绍。

下面的代码清单抽取数据源中StockPrice的timestamp字段作为该事情的时刻戳。

classStockPriceTimeAssignerextendsBoundedOutOfOrdernessTimestampExtractor[StockPrice](Time.seconds(5)){\noverridedefextractTimestamp(t:StockPrice):Long=t.timestamp\n}

Transformation

此刻,咱们现已获取了一个股票价格数据流,接下来咱们就能够在数据流上进行有状况的核算了。咱们一般运用Flink供给的各类算子,运用链式调用的办法,对一个数据流进行操作。通过各Transformation算子的处理,DataStream或许被转换为KeyedStream、JoinedStream等不同的数据流结构。比较SparkRDD的数据结构,Flink的数据流结构的确愈加杂乱。

本例中,咱们依照股票代号对数据进行分组,并敞开一个5秒的时刻窗口,核算该窗口下某支股票的5秒内的最大值。

valstockPriceStream:DataStream[StockPrice]=stockPriceRawStream\n.keyBy(_.symbol)\n//设置5秒的时刻窗口\n.window(TumblingEventTimeWindows.of(Time.seconds(5)))\n//取5秒内某一只股票的最大值\n.max("price")

keyBy算子将数据流依照股票的symbol分组,相同symbol的股票数据会被归结到一同;window算子敞开了一个5秒的翻滚窗口;max算子核算这个5秒窗口内的最大值。终究咱们能够得到每支股票5秒内的最大值。

输出成果

然后咱们需求将前面的核算成果输出到外部体系,或许是一个音讯行列、文件体系或数据库,也能够自界说输出办法,输出成果的部分统称为Sink。

本例中,5秒窗口内每支股票的最大值是核算成果,是一个DataStream[StockPrice]结构的数据流。咱们调用print函数将这个数据流打印到规范输出(standardoutput)。

履行

当界说好程序的Source、Transformation和Sink的事务逻辑后,程序并不会当即履行这些算子对应的任何核算,还需求调用履行环境execute()办法来履行前面的事务逻辑。Flink是推迟履行(lazyevaluation)的,即当程序清晰调用execute()办法时,Flink会将数据流图转化为一个JobGraph,提交给JobManager,JobManager依据当时的履行环境来履行这个作业。

总结

一个Flink程序的中心事务逻辑首要包含:Source、Transformation和Sink三部分。程序的开端前要设置履行环境,最终要调用execute()办法。

Flink中心事务逻辑

整个程序的完好代码如下所示,完好程序和更多事例拜见我的GitHub:github/luweizheng/flink-tutorials

packagecom.flink.tutorials.demos.stock\n\nimportjava.util.Calendar\n\nimportorg.apache.flink.streaming.api.TimeCharacteristic\nimportorg.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor\nimportorg.apache.flink.streaming.api.windowing.time.Time\nimportorg.apache.flink.streaming.api.functions.source.RichSourceFunction\nimportorg.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext\nimportorg.apache.flink.streaming.api.scala._\nimportorg.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows,TumblingProcessingTimeWindows}\n\nimportscala.util.Random\n\nobjectStockPriceDemo{\n\n/**\n*CaseClassStockPrice\n*symbol股票代号\n*timestamp时刻戳\n*price价格\n*/\ncaseclassStockPrice(symbol:String,timestamp:Long,price:Double)\n\ndefmain(args:Array[String]){\n\n//设置履行环境\nvalenv=StreamExecutionEnvironment.getExecutionEnvironment\n\nenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)\n//每5秒生成一个Watermark\nenv.getConfig.setAutoWatermarkInterval(5000L)\n\n//股票价格数据流\nvalstockPriceRawStream:DataStream[StockPrice]=env\n//该数据流由StockPriceSource类随机生成\n.addSource(newStockPriceSource)\n//设置Timestamp和Watermark\n.assignTimestampsAndWatermarks(newStockPriceTimeAssigner)\n\nvalstockPriceStream:DataStream[StockPrice]=stockPriceRawStream\n.keyBy(_.symbol)\n//设置5秒的时刻窗口\n.window(TumblingEventTimeWindows.of(Time.seconds(5)))\n//取5秒内某一只股票的最大值\n.max("price")\n\n//打印成果\nstockPriceStream.print()\n\n//履行程序\nenv.execute("Computemaxstockprice")\n}\n\nclassStockPriceSourceextendsRichSourceFunction[StockPrice]{\n\nvarisRunning:Boolean=true\n\nvalrand=newRandom()\n//初始化股票价格\nvarpriceList:List[Double]=List(100.0d,200.0d,300.0d,400.0d,500.0d)\nvarstockId=0\nvarcurPrice=0.0d\n\noverridedefrun(srcCtx:SourceContext[StockPrice]):Unit={\n\nwhile(isRunning){\n//每次从列表中随机挑选一只股票\nstockId=rand.nextInt(priceList.size)\n\nvalcurPrice=priceList(stockId)+rand.nextGaussian()*0.05\npriceList=priceList.updated(stockId,curPrice)\nvalcurTime=Calendar.getInstance.getTimeInMillis\n\n//将数据源搜集写入SourceContext\nsrcCtx.collect(StockPrice("symbol_"+stockId.toString,curTime,curPrice))\nThread.sleep(rand.nextInt(10))\n}\n}\n\noverridedefcancel():Unit={\nisRunning=false\n}\n}\n\nclassStockPriceTimeAssignerextendsBoundedOutOfOrdernessTimestampExtractor[StockPrice](Time.seconds(5)){\noverridedefextractTimestamp(t:StockPrice):Long=t.timestamp\n}\n\n}

发布于 2024-03-06 08:03:29
收藏
分享
海报
1
目录