博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Scala2.11.8 spark2.3.1 mongodb connector 2.3.0
阅读量:4591 次
发布时间:2019-06-09

本文共 2433 字,大约阅读时间需要 8 分钟。

import java.sql.DriverManagerimport com.mongodb.spark._import org.apache.spark.SparkConfimport org.apache.spark.sql.SparkSessionobject mongospark20180830consume_amount {  // 关于记录多个相同字段的处理方法 https://blog.csdn.net/qq_14950717/article/details/62425563  // https://blog.csdn.net/qq_27234661/article/details/78344435?locationNum=3&fps=1  def main(args: Array[String]): Unit = {   // val mgohost = "dds-m5e6e56a3b0cf7b42784-pub.mongodb.rds.aliyuncs.com"   // spark-submit --driver-class-path /usr/local/jdk/lib/mysql-connector-java-5.1.46.jar   --packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.0 --class  "mongospark20180830consume_amount"  /testdata/u3.jar    //  "org.mongodb.spark" %% "mongo-spark-connector" % "2.3.0",  //  val conn = DriverManager.getConnection(url)    val conf = new SparkConf().setAppName("appName").setMaster("local")    val sparkConf = new SparkConf().setAppName("adver").setMaster("local[*]")    val spark = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()    val inputUri="mongodb://saas:saas2018yundinglixin@dds-m5e6e56a3b0cf7b42784-pub.mongodb.rds.aliyuncs.com:3717/saas.elemeterPowerHistory"    val df=spark.read.format("com.mongodb.spark.sql").options(      Map("spark.mongodb.input.uri" -> inputUri,        "spark.mongodb.input.partitioner" -> "MongoPaginateBySizePartitioner",        "spark.mongodb.input.partitionerOptions.partitionKey"  -> "_id",        "spark.mongodb.input.partitionerOptions.partitionSizeMB"-> "32"))      .load()    val currentTimestamp = System.currentTimeMillis()    val df2 = df.select("time".toString,"uuid".toString,"consume_amount".toString,"room_id".toString)      .toDF("time","uuid","consume_amount","room_id")    spark.sql("use saas")    df2.write.mode("overwrite").saveAsTable("consume_amount20180831")//       df2.foreach(println)////    val rddf=spark.sql( "select uuid,from_unixtime(cast(`time`/1000 AS bigint),'yyyyMMddHH'),consume_amount from consume where time>=1533115788000").toDF("uuid", "time","consume_amount")//  //  spark.sql("use saas")////    rddf.write.saveAsTable("consume_amount20180830")//   // val p=df.printSchema()////   // val select=spark.sql("select s.sn,s.uuid,e.time,e.consume_amount from staonly2  s join elem e on s.uuid=e.uuid").take(10)//    val select=spark.sql("select consume_amount from elem limit 5").take(5)//     select.foreach(println)  }}

 

转载于:https://www.cnblogs.com/canyangfeixue/p/5691865.html

你可能感兴趣的文章
TCP/IP和Socket的关系
查看>>
EasyNVR摄像机H5流媒体服务器在windows上批处理脚本自动以管理员权限运行
查看>>
使用btoa和atob来进行Base64转码和解码
查看>>
201521123006 《java程序设计》 第8周学习总结
查看>>
网络对抗作业一
查看>>
数据库运维平台
查看>>
团队项目个人工作总结(4.26)
查看>>
31 | 误删数据后除了跑路,还能怎么办?
查看>>
int Parse方法引发的异常
查看>>
MAVEN 构建包的引用
查看>>
Pyhton网络爬虫之CrawlSpider
查看>>
PHP Windows环境部署
查看>>
LIMIT
查看>>
Web安全相关(三):开放重定向(Open Redirection)
查看>>
【转】并行计算部分总结
查看>>
httpclient提示Cookie rejected: violates RFC 2109: domain must start with a dot
查看>>
linux输入子系统
查看>>
DOM练习及总结(菱形)
查看>>
WEB应用从服务器主动推送Data到客户端有那些方式?
查看>>
[转]ionic Accordion list three levels
查看>>