博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
《Spark 官方文档》Spark SQL, DataFrames 以及 Datasets 编程指南(三)
阅读量:5931 次
发布时间:2019-06-19

本文共 7087 字,大约阅读时间需要 23 分钟。

JSON数据集

Spark SQL在加载JSON数据的时候,可以自动推导其schema并返回DataFrame。用SQLContext.read.json读取一个包含String的RDD或者JSON文件,即可实现这一转换。

注意,通常所说的json文件只是包含一些json数据的文件,而不是我们所需要的JSON格式文件。JSON格式文件必须每一行是一个独立、完整的的JSON对象。因此,一个常规的多行json文件经常会加载失败。

// sc是已有的SparkContext对象val sqlContext = new org.apache.spark.sql.SQLContext(sc)// 数据集是由路径指定的// 路径既可以是单个文件,也可以还是存储文本文件的目录val path = "examples/src/main/resources/people.json"val people = sqlContext.read.json(path)// 推导出来的schema,可由printSchema打印出来people.printSchema()// root//  |-- age: integer (nullable = true)//  |-- name: string (nullable = true)// 将DataFrame注册为tablepeople.registerTempTable("people")// 跑SQL语句吧!val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")// 另一种方法是,用一个包含JSON字符串的RDD来创建DataFrameval anotherPeopleRDD = sc.parallelize(  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)val anotherPeople = sqlContext.read.json(anotherPeopleRDD)

Hive表

Spark SQL支持从读写数据。然而,Hive依赖项太多,所以没有把Hive包含在默认的Spark发布包里。要支持Hive,需要在编译spark的时候增加-Phive和-Phive-thriftserver标志。这样编译打包的时候将会把Hive也包含进来。注意,hive的jar包也必须出现在所有的worker节点上,访问Hive数据时候会用到(如:使用hive的序列化和反序列化SerDes时)。

Hive配置在conf/目录下hive-site.xml,core-site.xml(安全配置),hdfs-site.xml(HDFS配置)文件中。请注意,如果在YARN cluster(yarn-cluster mode)模式下执行一个查询的话,lib_mananged/jar/下面的datanucleus 的jar包,和conf/下的hive-site.xml必须在驱动器(driver)和所有执行器(executor)都可用。一种简便的方法是,通过spark-submit命令的–jars和–file选项来提交这些文件。

如果使用Hive,则必须构建一个HiveContext,HiveContext是派生于SQLContext的,添加了在Hive Metastore里查询表的支持,以及对HiveQL的支持。用户没有现有的Hive部署,也可以创建一个HiveContext。如果没有在hive-site.xml里配置,那么HiveContext将会自动在当前目录下创建一个metastore_db目录,再根据HiveConf设置创建一个warehouse目录(默认/user/hive/warehourse)。所以请注意,你必须把/user/hive/warehouse的写权限赋予启动spark应用程序的用户。

// sc是一个已有的SparkContext对象val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")// 这里用的是HiveQLsqlContext.sql("FROM src SELECT key, value").collect().foreach(println)

和不同版本的Hive Metastore交互

Spark SQL对Hive最重要的支持之一就是和Hive metastore进行交互,这使得Spark SQL可以访问Hive表的元数据。从Spark-1.4.0开始,Spark SQL有专门单独的二进制build版本,可以用来访问不同版本的Hive metastore,其配置表如下。注意,不管所访问的hive是什么版本,Spark SQL内部都是以Hive 1.2.1编译的,而且内部使用的Hive类也是基于这个版本(serdes,UDFs,UDAFs等)

以下选项可用来配置Hive版本以便访问其元数据:

属性名 默认值 含义
spark.sql.hive.metastore.version 1.2.1 Hive metastore版本,可选的值为0.12.0 到 1.2.1
spark.sql.hive.metastore.jars builtin 初始化HiveMetastoreClient的jar包。这个属性可以是以下三者之一:

  1. builtin

目前内建为使用Hive-1.2.1,编译的时候启用-Phive,则会和spark一起打包。如果没有-Phive,那么spark.sql.hive.metastore.version要么是1.2.1,要就是未定义

  1. maven

使用maven仓库下载的jar包版本。这个选项建议不要再生产环境中使用

  1. JVM格式的classpath。这个classpath必须包含所有Hive及其依赖的jar包,且包含正确版本的hadoop。这些jar包必须部署在driver节点上,如果你使用yarn-cluster模式,那么必须确保这些jar包也随你的应用程序一起打包
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,
org.postgresql,
com.microsoft.sqlserver,
oracle.jdbc
一个逗号分隔的类名前缀列表,这些类使用classloader加载,且可以在Spark SQL和特定版本的Hive间共享。例如,用来访问hive metastore 的JDBC的driver就需要这种共享。其他需要共享的类,是与某些已经共享的类有交互的类。例如,自定义的log4j appender
spark.sql.hive.metastore.barrierPrefixes (empty) 一个逗号分隔的类名前缀列表,这些类在每个Spark SQL所访问的Hive版本中都会被显式的reload。例如,某些在共享前缀列表(spark.sql.hive.metastore.sharedPrefixes)中声明为共享的Hive UD函数

用JDBC连接其他数据库

Spark SQL也可以用JDBC访问其他数据库。这一功能应该优先于使用JdbcRDD。因为它返回一个DataFrame,而DataFrame在Spark SQL中操作更简单,且更容易和来自其他数据源的数据进行交互关联。JDBC数据源在java和python中用起来也很简单,不需要用户提供额外的ClassTag。(注意,这与Spark SQL JDBC server不同,Spark SQL JDBC server允许其他应用执行Spark SQL查询)

首先,你需要在spark classpath中包含对应数据库的JDBC driver,下面这行包括了用于访问postgres的数据库driver

SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell

远程数据库的表可以通过Data Sources API,用DataFrame或者SparkSQL 临时表来装载。以下是选项列表:

属性名 含义
url 需要连接的JDBC URL
dbtable 需要读取的JDBC表。注意,任何可以填在SQL的where子句中的东西,都可以填在这里。(既可以填完整的表名,也可填括号括起来的子查询语句)
driver JDBC driver的类名。这个类必须在master和worker节点上都可用,这样各个节点才能将driver注册到JDBC的子系统中。
partitionColumn, lowerBound, upperBound, numPartitions 这几个选项,如果指定其中一个,则必须全部指定。他们描述了多个worker如何并行的读入数据,并将表分区。partitionColumn必须是所查询的表中的一个数值字段。注意,lowerBound和upperBound只是用于决定分区跨度的,而不是过滤表中的行。因此,表中所有的行都会被分区然后返回。
fetchSize JDBC fetch size,决定每次获取多少行数据。在JDBC驱动上设成较小的值有利于性能优化(如,Oracle上设为10)
val jdbcDF = sqlContext.read.format("jdbc").options(  Map("url" -> "jdbc:postgresql:dbserver",  "dbtable" -> "schema.tablename")).load()

疑难解答

  • JDBC driver class必须在所有client session或者executor上,对java的原生classloader可见。这是因为Java的DriverManager在打开一个连接之前,会做安全检查,并忽略所有对原声classloader不可见的driver。最简单的一种方法,就是在所有worker节点上修改compute_classpath.sh,并包含你所需的driver jar包。
  • 一些数据库,如H2,会把所有的名字转大写。对于这些数据库,在Spark SQL中必须也使用大写。

性能调整

对于有一定计算量的Spark作业来说,可能的性能改进的方式,不是把数据缓存在内存里,就是调整一些开销较大的选项参数。

内存缓存

Spark SQL可以通过调用SQLContext.cacheTable(“tableName”)或者DataFrame.cache()把tables以列存储格式缓存到内存中。随后,Spark SQL将会扫描必要的列,并自动调整压缩比例,以减少内存占用和GC压力。你也可以用SQLContext.uncacheTable(“tableName”)来删除内存中的table。

你还可以使用SQLContext.setConf 或在SQL语句中运行SET key=value命令,来配置内存中的缓存。

属性名 默认值 含义
spark.sql.inMemoryColumnarStorage.compressed true 如果设置为true,Spark SQL将会根据数据统计信息,自动为每一列选择单独的压缩编码方式。
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制列式缓存批量的大小。增大批量大小可以提高内存利用率和压缩率,但同时也会带来OOM(Out Of Memory)的风险。

其他配置选项

以下选项同样也可以用来给查询任务调性能。不过这些选项在未来可能被放弃,因为spark将支持越来越多的自动优化。

属性名 默认值 含义
spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 配置join操作时,能够作为广播变量的最大table的大小。设置为-1,表示禁用广播。注意,目前的元数据统计仅支持Hive metastore中的表,并且需要运行这个命令:ANALYSE TABLE <tableName> COMPUTE STATISTICS noscan
spark.sql.tungsten.enabled true 设为true,则启用优化的Tungsten物理执行后端。Tungsten会显式的管理内存,并动态生成表达式求值的字节码
spark.sql.shuffle.partitions 200 配置数据混洗(shuffle)时(join或者聚合操作),使用的分区数。

分布式SQL引擎

Spark SQL可以作为JDBC/ODBC或者命令行工具的分布式查询引擎。在这种模式下,终端用户或应用程序,无需写任何代码,就可以直接在Spark SQL中运行SQL查询。

运行Thrift JDBC/ODBC server

这里实现的Thrift JDBC/ODBC server和Hive-1.2.1中的是相同的。你可以使用beeline脚本来测试Spark或者Hive-1.2.1的JDBC server。

在Spark目录下运行下面这个命令,启动一个JDBC/ODBC server

./sbin/start-thriftserver.sh

这个脚本能接受所有 bin/spark-submit 命令支持的选项参数,外加一个 –hiveconf 选项,来指定Hive属性。运行./sbin/start-thriftserver.sh –help可以查看完整的选项列表。默认情况下,启动的server将会在localhost:10000端口上监听。要改变监听主机名或端口,可以用以下环境变量:

export HIVE_SERVER2_THRIFT_PORT=
export HIVE_SERVER2_THRIFT_BIND_HOST=
./sbin/start-thriftserver.sh \ --master
\ ...

或者Hive系统属性 来指定

./sbin/start-thriftserver.sh \  --hiveconf hive.server2.thrift.port=
\ --hiveconf hive.server2.thrift.bind.host=
\ --master
...

接下来,你就可以开始在beeline中测试这个Thrift JDBC/ODBC server:

./bin/beeline

下面的指令,可以连接到一个JDBC/ODBC server

beeline> !connect jdbc:hive2://localhost:10000

可能需要输入用户名和密码。在非安全模式下,只要输入你本机的用户名和一个空密码即可。对于安全模式,请参考.

Hive的配置是在conf/目录下的hive-site.xml,core-site.xml,hdfs-site.xml中指定的。

你也可以在beeline的脚本中指定。

Thrift JDBC server也支持通过HTTP传输Thrift RPC消息。以下配置(在conf/hive-site.xml中)将启用HTTP模式:

hive.server2.transport.mode - Set this to value: httphive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001hive.server2.http.endpoint - HTTP endpoint; default is cliservice

同样,在beeline中也可以用HTTP模式连接JDBC/ODBC server:

beeline> !connect jdbc:hive2://
:
/
?hive.server2.transport.mode=http;hive.server2.thrift.http.path=

转载自 
你可能感兴趣的文章
vs2008 x64编译环境 忽略了 #ifdef WIN32
查看>>
【微信小程序】再次授权地理位置getLocation+openSetting使用
查看>>
手机端上传图片及java后台接收和ajaxForm提交
查看>>
HDU 5030 Rabbit's String
查看>>
【MSDN 目录】C#编程指南、C#教程、ASP.NET参考、ASP.NET 4、.NET Framework类库
查看>>
windows服务 2.实时刷新App.config
查看>>
jquery 怎么触发select的change事件
查看>>
angularjs指令(二)
查看>>
(原創) 如何建立一个thread? (OS) (Linux) (C/C++) (C)
查看>>
<气场>读书笔记
查看>>
vue-cli创建的项目,如何让键盘监听事件,只在一个页面(url)内有效,如下图和代码...
查看>>
实现一个平行四边形
查看>>
领域驱动设计,构建简单的新闻系统,20分钟够吗?
查看>>
web安全问题分析与防御总结
查看>>
React 组件通信之 React context
查看>>
ZooKeeper 可视化监控 zkui
查看>>
Linux下通过配置Crontab实现进程守护
查看>>
ios 打包上传Appstore 时报的错误 90101 90149
查看>>
Oracle推出轻量级Java微服务框架Helidon
查看>>
密码概述
查看>>