构建基于大数据中台的金华数据应用实践
agent.sources = sensorSource
agent.channels = memoryChannel
agent.sinks = hdfsSink
agent.sources.sensorSource.type = netcat
agent.sources.sensorSource.bind = localhost
agent.sources.sensorSource.port = 44444
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 1000
agent.channels.memoryChannel.transactionCapacity = 100
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.hdfs.path = /data/sensor
agent.sinks.hdfsSink.hdfs.filePrefix = sensor_data
agent.sinks.hdfsSink.hdfs.rollInterval = 60
]]
from pyspark.sql import SparkSession
# 初始化SparkSession
spark = SparkSession.builder \
.appName("Jinhua Air Quality Analysis") \
.getOrCreate()
# 加载数据
df = spark.read.format("csv").option("header", "true").load("/data/sensor/air_quality.csv")
# 数据清洗
cleaned_df = df.dropna()
# 统计污染最高的区域
pollution_analysis = cleaned_df.groupBy("region").agg({"pollution_level": "max"}).orderBy("max(pollution_level)", ascending=False)
# 显示结果
pollution_analysis.show()
]]>
本站知识库部分内容及素材来源于互联网,如有侵权,联系必删!