这种统计可以用计数器完成,以下代码没什么业务逻辑,纯属实验
package cn.liangc.hadoop.nmr;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 测试输入数据格式:
* ...
* 183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /o2o/media.html?menu=3 HTTP/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"
* ...
* 输出数据格式
* ...
* 日期 独立IP个数
* ...
* @author liangchuan
*/
public class NginxAccessLogMR {
public static class Map01 extends Mapper<LongWritable, Text, Text, Text> {
private Date getDateByValue(String vs) throws ParseException {
String date = vs.substring(vs.indexOf("["), vs.indexOf("]") + 1);
SimpleDateFormat format = new SimpleDateFormat("[dd/MMM/yyyy:HH:mm:ss Z]", Locale.US);
Date d = format.parse(date);
return d;
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
try {
String vs = value.toString();
String[] arr = vs.split("- -");
// String k = arr[0].trim();// IP
String v = arr[1].trim();// others
Date d = getDateByValue(vs);// DATE
SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
Text k = new Text(f.format(d));
// 以日期分组
context.write(k, new Text(vs));
} catch (Exception e) {
System.out.println("MAPPER ++++++++++++++++++++++++++"+e.getMessage());
}
}
}
public static class Reduce01 extends Reducer<Text, Text, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Map<String,String> m = new HashMap<String,String>();
for(Text value : values){
String vs = value.toString();
String[] arr = vs.split("- -");
String ip = arr[0].trim();// IP
m.put(ip, "");
}
System.out.println("RRRRRRR <><> "+m);
context.write(key, new IntWritable(m.size()));
}
}
/**
* 文件名过滤
*
* @author liangchuan
*
*/
public static class MyPathFilter implements PathFilter, Configurable {
Configuration conf = null;
FileSystem fs = null;
@Override
public Configuration getConf() {
return this.conf;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public boolean accept(Path path) {
try {
fs = FileSystem.get(conf);
FileStatus fileStatus = fs.getFileStatus(path);
if (!fileStatus.isDir()) {
String fileName = path.getName();
if (!fileName.contains(conf.get("pathfilter.pattern"))) {
return true;
}
}
} catch (IOException e) {
System.out.println("MyPathFilter ++++++++++++++++++++++++++");
e.printStackTrace();
}
return false;
}
}
public static void main(String[] args) {
// JobConf conf = new JobConf(MaxTptr.class);
Job job = null;
try {
job = new Job();
job.setJarByClass(NginxAccessLogMR.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(Map01.class);
job.setReducerClass(Reduce01.class);
/**
* map 的输出如果跟 reduce 的输出不一致则必须要做此步配置,否则会按照 reduce 的输出进行默认
*/
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 第三个参数是要过滤的文件名关键字,默认error
String pfk = args.length > 2 ? args[2] : "error";
job.getConfiguration().set("pathfilter.pattern", pfk);
FileInputFormat.setInputPathFilter(job, MyPathFilter.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}
分享到:
相关推荐
基于Hadoop网站流量日志数据分析系统 1、典型的离线流数据分析系统 2、技术分析 - Hadoop - nginx - flume - hive - mysql - springboot + mybatisplus+vcharts nginx + lua 日志文件埋点的 基于Hadoop网站流量...
MR处理HDFS日志样例
基于Hadoop网站流量日志数据分析系统项目源码+教程.zip网站流量日志数据分析系统 典型的离线流数据分析系统 技术分析 hadoop nginx flume hive sqoop mysql springboot+mybatisplus+vcharts 基于Hadoop网站流量日志...
hadoop权威指南样例源代码TDG2example
基于Hadoop 集群的日志分析系统的设计与实现.docx基于Hadoop 集群的日志分析系统的设计与实现.docx基于Hadoop 集群的日志分析系统的设计与实现.docx基于Hadoop 集群的日志分析系统的设计与实现.docx基于Hadoop 集群...
基于Hadoop的Web日志挖掘 海量数据 访问日志分析
利用hadoop集群处理分析日志文件
基于Hadoop的Web日志挖掘.pdf
的有用信息也难以发现 分布式计算技术正好可以用来解决这一难题 阐述了 syslog 日志收集流程 详细介绍了 Hadoop 分布式计 算框架 设计并实现了一套基于 Hadoop 的网络日志分析系统 实验证明该系统是有效而实用的
在处理海量数据的时候,传统的单机方法面临着数据存储和计算的瓶颈。本文提出了 利用开源框架Hadoop 处理海量数据方法,以弥补传统方法在这方面的缺陷与不足.
NULL 博文链接:https://dodomail.iteye.com/blog/1744269
Flume采集Nginx日志到新版Hive,Flume中需要添加的Jar包,各软件版本为:Hadoop 3.2.0、Flume 1.9.0、Hive 3.1.2、Nginx 1.17.2。
至此,我们通过Python网络爬虫手段进行数据抓取,将我们网站数据(2013-05-30,2013-05-31)保存为两个日志文件,由于文件大小超出我们一般的分析工具处理的范围,故借助Hadoop来完成本次的实践。 2. 总体设计 2.1 ...
基于Hadoop的海量日志数据处理,王小森,,在处理海量数据的时候,传统的单机方法面临着数据存储和计算的瓶颈。本文提出了利用开源框架Hadoop处理海量数据方法,以弥补传统方
Hadoop在客票日志处理系统中的应用.pdf
nginx反向代理和负载均衡
基于Hadoop的Web日志分析项目源码(日志的清洗、统计分析、统计结果的导出、指标数据的Web展示)+项目说明.zip 包含如下 【主要分析统计的指标数据】 浏览量PV 访客数UV IP数 跳出率 【系统架构设计】 【数据库表结构...
Hadoop日志存放位置,存储规则和存储的位置,Hadoop日志存储修改等
第四章(Hadoop大数据处理实战)Hadoop分布式文件系统.pdf第四章(Hadoop大数据处理实战)Hadoop分布式文件系统.pdf第四章(Hadoop大数据处理实战)Hadoop分布式文件系统.pdf第四章(Hadoop大数据处理实战)Hadoop分布式文件...
基于Hadoop集群的分布式日志分析系统研究