方法一:从hdfs读取
# -*- coding: utf-8 -* from pyspark.sql import SparkSession, HiveContext,DataFrameWriter import argparse import time import numpy as np import pandas as pd spark = SparkSession.builder.enableHiveSupport().appName("test").getOrCreate() start = time.time() ### 数据载入方法1: hdfs上载入parquent格式 input = "/aaa/bbb/ccc" data = spark.read.parquet(input) data.show(5) +-------------------+------+--------------------+ | START_TIME|amount| payerCode| +-------------------+------+--------------------+ |2019-06-28 21:04:37| 10.7|692200000XXXXXXX| |2018-11-24 20:15:40| 19.9|602200000XXXXXXX| |2019-06-19 12:33:14| 2.0|692200000XXXXXXX| |2019-07-03 23:04:12| 5.27|622200000XXXXXXX| |2018-11-26 21:26:30| 2.0|622200000XXXXXXX| +-------------------+------+--------------------+ ## pyspark读取数据方法二:从hive中读取
方法二:数据从数据库读取
####### 生成查询的SQL语句,这个跟hive的查询语句一样,所以也可以加where等条件语句 hive_context= HiveContext(spark) hive_read = "select * from {}.{}".format(hive_database, hive_table2) ####### 通过SQL语句在hive中查询的数据直接是dataframe的形式 read_df = hive_context.sql(hive_read) read_df.show(5) +-------------------+------+--------------------+ | START_TIME|amount| payerCode| +-------------------+------+--------------------+ |2019-06-28 21:04:37| 10.7|692200000XXXXXXX| |2018-11-24 20:15:40| 19.9|602200000XXXXXXX| |2019-06-19 12:33:14| 2.0|692200000XXXXXXX| |2019-07-03 23:04:12| 5.27|622200000XXXXXXX| |2018-11-26 21:26:30| 2.0|622200000XXXXXXX| +-------------------+------+--------------------+
方法3:读取hdfs上的csv文件
tttt = spark.read.csv(filepath,header=’true’,inferSchema=’true’,sep=’,’)
方法1: 以parquent格式存储到hdfs
data1.write.mode(SaveMode.Overwrite).parquet(output)
方法2:以Table的格式存入hive数据库
##### 数据存入数据库 hive_database = "testt0618" data1 = data.limit(10)
1: 用saveAsTable()方法存入hive数据库
hive_table1 = "ii" data1.write.format("hive").mode("overwrite").saveAsTable('{}.{}'.format(hive_database, hive_table1))
2:利用sql语句存入hive数据库
hive_table2 = "lll" data1.registerTempTable('test_hive') sqlContext.sql("create table {}.{} select * from test_hive".format(hive_database, hive_table2))
方法3:以csv格式存储到hdfs
output = “/aaa/bbb/ccc” data1.coalesce(1).write.option("sep", "#").option("header", "true").csv(output + "_text",mode='overwrite')
参考相关:
- www.zzvips.com/article/73466.html
- https://zhuanlan.zhihu.com/p/34901558
热门文章
- 狗粮批发价在哪里找(狗粮哪里有卖)
- 「1月4日」最高速度21.5M/S,2025年V2ray/SSR/Clash/Shadowrocket每天更新免费节点订阅链接
- 「2月10日」最高速度19.6M/S,2025年Clash/SSR/Shadowrocket/V2ray每天更新免费节点订阅链接
- 动物疫苗属于疫苗分类吗为什么没有营养(动物疫苗包括哪些)
- 动物防疫的名词解释(动物防疫的概念)
- 宠物家庭寄养协议书范本下载(宠物寄养协议有法律效力吗)
- 动物医院取什么名字比较好一点(动物医院取什么名字比较好一点的)
- 猫猫做驱虫多少钱(宠物猫做驱虫多少钱)
- 驱虫给猫多少钱(驱虫猫多少钱一次)
- 猫粮狗粮零售如何挣钱(狗粮猫粮的销售和利润怎么样)