西部数码主机 | 阿里云主机| 虚拟主机 | 服务器 | 返回乐道官网
当前位置: 主页 > 开发教程 > mysql教程 >

Spark读取数据库(Mysql)的四种方式讲解

时间:2016-01-07 16:14来源:未知 作者:好模板 点击:
现在Spark支撑四种办法从数据库中读取数据,这里以Mysql为例进行介绍。 一、不指定查询条件 这个办法连接MySql的函数原型是: def jdbc(url: String, table: String, properties: Properties): DataFrame 咱

现在Spark支撑四种办法从数据库中读取数据,这里以Mysql为例进行介绍。

 

一、不指定查询条件

 

这个办法连接MySql的函数原型是:

def jdbc(url: String, table: String, properties: Properties): DataFrame

  咱们只需求供给Driver的url,需求查询的表名,以及连接表有关特点properties。下面是详细比如:

val url = "jdbc:mysql://www.iteblog.com:3306/iteblog?user=iteblog&password=iteblog"

val prop = new Properties()

val df = sqlContext.read.jdbc(url, "iteblog", prop )

 

println(df.count())

println(df.rdd.partitions.size)

 

咱们运转上面的程序,能够看到df.rdd.partitions.size输出成果是1,这个成果的意义是iteblog表的所有数据都是由RDD的一个分区处理的,所以说,假如你这个表很大,很可能会呈现OOM

 

WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 14, spark047219):

java.lang.OutOfMemoryError: GC overhead limit exceeded at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3380)

 

这种办法在数据量大的时分不主张运用。

 

二、指定数据库字段的规模

 

这种办法即是经过指定数据库中某个字段的规模,可是惋惜的是,这个字段有必要是数字,来看看这个函数的函数原型:

 

def jdbc(

url: String,

table: String,

columnName: String,

lowerBound: Long,

upperBound: Long,

numPartitions: Int,

connectionProperties: Properties): DataFrame

 

前两个字段的意义和办法一相似。columnName即是需求分区的字段,这个字段在数据库中的类型有必要是数字;lowerBound即是分区的下界;upperBound即是分区的上界;numPartitions是分区的个数。同样,咱们也来看看怎么运用:

 

val lowerBound = 1

val upperBound = 100000

val numPartitions = 5

val url = "jdbc:mysql://www.iteblog.com:3306/iteblog?user=iteblog&password=iteblog"

 

val prop = new Properties()

val df = sqlContext.read.jdbc(url, "iteblog", "id", lowerBound, upperBound, numPartitions, prop)

 

这个办法能够将iteblog表的数据散布到RDD的几个分区中,分区的数量由numPartitions参数决议,在抱负情况下,每个分区处理一样数量的数据,咱们在运用的时分不主张将这个值设置的比较大,由于这可能致使数据库挂掉!可是根据前面介绍,这个函数的缺陷即是只能运用整形数据字段作为分区关键词。

 

这个函数在极端情况下,也即是设置将numPartitions设置为1,其意义和第一种办法共同。

 

三、根据恣意字段进行分区

 

根据前面两种办法的约束,Spark还供给了根据恣意字段进行分区的办法,函数原型如下:

 

def jdbc(

url: String,

table: String,

predicates: Array[String],

connectionProperties: Properties): DataFrame

 

这个函数相比第一种办法多了predicates参数,咱们能够经过这个参数设置分区的根据,来看看比如:

 

val predicates = Array[String]("reportDate <= '2014-12-31'",

"reportDate > '2014-12-31' and reportDate <= '2015-12-31'")

val url = "jdbc:mysql://www.iteblog.com:3306/iteblog?user=iteblog&password=iteblog"

 

val prop = new Properties()

val df = sqlContext.read.jdbc(url, "iteblog", predicates, prop)

 

最终rdd的分区数量就等于predicates.length。

四、经过load获取

 

Spark还供给经过load的办法来读取数据。

 

sqlContext.read.format("jdbc").options(

Map("url" -> "jdbc:mysql://www.iteblog.com:3306/iteblog?user=iteblog&password=iteblog",

"dbtable" -> "iteblog")).load()

 

options函数支撑url、driver、dbtable、partitionColumn、lowerBound、upperBound以及numPartitions选项,仔细的同学必定发现这个和办法二的参数共同。是的,其内部完成原理部分和办法二大体共同。一起load办法还支撑json、orc等数据源的读取

 

 

(责任编辑:好模板)
顶一下
(0)
0%
踩一下
(0)
0%
------分隔线----------------------------
栏目列表
热点内容